diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 3f11a4823a..4eef78acf0 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -71,6 +71,7 @@ const ( signozSampleLocalTableName = "samples_v2" signozSampleTableName = "distributed_samples_v2" signozTSTableName = "distributed_time_series_v2" + signozTSTableNameV41Day = "distributed_time_series_v4_1day" minTimespanForProgressiveSearch = time.Hour minTimespanForProgressiveSearchMargin = time.Minute @@ -3287,7 +3288,7 @@ func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNames []s metricNameToTemporality := make(map[string]map[v3.Temporality]bool) - query := fmt.Sprintf(`SELECT DISTINCT metric_name, temporality FROM %s.%s WHERE metric_name IN $1`, signozMetricDBName, signozTSTableName) + query := fmt.Sprintf(`SELECT DISTINCT metric_name, temporality FROM %s.%s WHERE metric_name IN $1`, signozMetricDBName, signozTSTableNameV41Day) rows, err := r.db.Query(ctx, query, metricNames) if err != nil { @@ -3309,15 +3310,6 @@ func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNames []s return metricNameToTemporality, nil } -// func sum(array []tsByMetricName) uint64 { -// var result uint64 -// result = 0 -// for _, v := range array { -// result += v.count -// } -// return result -// } - func (r *ClickHouseReader) GetTimeSeriesInfo(ctx context.Context) (map[string]interface{}, error) { queryStr := fmt.Sprintf("SELECT count() as count from %s.%s group by metric_name order by count desc;", signozMetricDBName, signozTSTableName) @@ -3960,7 +3952,7 @@ func (r *ClickHouseReader) GetMetricAggregateAttributes(ctx context.Context, req var rows driver.Rows var response v3.AggregateAttributeResponse - query = fmt.Sprintf("SELECT DISTINCT(metric_name) from %s.%s WHERE metric_name ILIKE $1", signozMetricDBName, signozTSTableName) + query = fmt.Sprintf("SELECT DISTINCT metric_name, type from %s.%s WHERE metric_name ILIKE $1", signozMetricDBName, signozTSTableNameV41Day) if req.Limit != 0 { query = query + fmt.Sprintf(" LIMIT %d;", req.Limit) } @@ -3972,15 +3964,16 @@ func (r *ClickHouseReader) GetMetricAggregateAttributes(ctx context.Context, req } defer rows.Close() - var metricName string + var metricName, typ string for rows.Next() { - if err := rows.Scan(&metricName); err != nil { + if err := rows.Scan(&metricName, &typ); err != nil { return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) } + // unlike traces/logs `tag`/`resource` type, the `Type` will be metric type key := v3.AttributeKey{ Key: metricName, DataType: v3.AttributeKeyDataTypeFloat64, - Type: v3.AttributeKeyTypeUnspecified, + Type: v3.AttributeKeyType(typ), IsColumn: true, } response.AttributeKeys = append(response.AttributeKeys, key) @@ -4111,6 +4104,68 @@ func (r *ClickHouseReader) GetLatencyMetricMetadata(ctx context.Context, metricN }, nil } +func (r *ClickHouseReader) GetMetricMetadata(ctx context.Context, metricName, serviceName string) (*v3.MetricMetadataResponse, error) { + // Note: metric metadata should be accessible regardless of the time range selection + // our standard retention period is 30 days, so we are querying the table v4_1_day to reduce the + // amount of data scanned + query := fmt.Sprintf("SELECT DISTINCT temporality, description, type, unit, is_monotonic from %s.%s WHERE metric_name=$1", signozMetricDBName, signozTSTableNameV41Day) + rows, err := r.db.Query(ctx, query, metricName) + if err != nil { + zap.S().Error(err) + return nil, fmt.Errorf("error while fetching metric metadata: %s", err.Error()) + } + defer rows.Close() + + var deltaExists, isMonotonic bool + var temporality, description, metricType, unit string + for rows.Next() { + if err := rows.Scan(&temporality, &description, &metricType, &unit, &isMonotonic); err != nil { + return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) + } + if temporality == string(v3.Delta) { + deltaExists = true + } + } + + query = fmt.Sprintf("SELECT DISTINCT(JSONExtractString(labels, 'le')) as le from %s.%s WHERE metric_name=$1 AND type = 'Histogram' AND JSONExtractString(labels, 'service_name') = $2 ORDER BY le", signozMetricDBName, signozTSTableNameV41Day) + rows, err = r.db.Query(ctx, query, metricName, serviceName) + if err != nil { + zap.S().Error(err) + return nil, fmt.Errorf("error while executing query: %s", err.Error()) + } + defer rows.Close() + + var leFloat64 []float64 + for rows.Next() { + var leStr string + if err := rows.Scan(&leStr); err != nil { + return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) + } + le, err := strconv.ParseFloat(leStr, 64) + // ignore the error and continue if the value is not a float + // ideally this should not happen but we have seen ClickHouse + // returning empty string for some values + if err != nil { + zap.S().Error("error while parsing le value: ", err) + continue + } + if math.IsInf(le, 0) { + continue + } + leFloat64 = append(leFloat64, le) + } + + return &v3.MetricMetadataResponse{ + Delta: deltaExists, + Le: leFloat64, + Description: description, + Unit: unit, + Type: metricType, + IsMonotonic: isMonotonic, + Temporality: temporality, + }, nil +} + func isColumn(tableStatement, attrType, field, datType string) bool { // value of attrType will be `resource` or `tag`, if `tag` change it to `attribute` name := utils.GetClickhouseColumnName(attrType, datType, field) diff --git a/pkg/query-service/app/formula.go b/pkg/query-service/app/formula.go index 2ead902172..657a7bcad9 100644 --- a/pkg/query-service/app/formula.go +++ b/pkg/query-service/app/formula.go @@ -154,6 +154,11 @@ func processResults(results []*v3.Result, expression *govaluate.EvaluableExpress return nil, err } if series != nil { + labelsArray := make([]map[string]string, 0) + for k, v := range series.Labels { + labelsArray = append(labelsArray, map[string]string{k: v}) + } + series.LabelsArray = labelsArray newSeries = append(newSeries, series) } } diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index e548d9e6b4..fbddb704aa 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -84,6 +84,12 @@ type APIHandler struct { preferDelta bool preferSpanMetrics bool + // temporalityMap is a map of metric name to temporality + // to avoid fetching temporality for the same metric multiple times + // querying the v4 table on low cardinal temporality column + // should be fast but we can still avoid the query if we have the data in memory + temporalityMap map[string]map[v3.Temporality]bool + maxIdleConns int maxOpenConns int dialTimeout time.Duration @@ -161,6 +167,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { skipConfig: opts.SkipConfig, preferDelta: opts.PerferDelta, preferSpanMetrics: opts.PreferSpanMetrics, + temporalityMap: make(map[string]map[v3.Temporality]bool), maxIdleConns: opts.MaxIdleConns, maxOpenConns: opts.MaxOpenConns, dialTimeout: opts.DialTimeout, @@ -335,6 +342,7 @@ func (aH *APIHandler) RegisterQueryRangeV3Routes(router *mux.Router, am *AuthMid func (aH *APIHandler) RegisterQueryRangeV4Routes(router *mux.Router, am *AuthMiddleware) { subRouter := router.PathPrefix("/api/v4").Subrouter() subRouter.HandleFunc("/query_range", am.ViewAccess(aH.QueryRangeV4)).Methods(http.MethodPost) + subRouter.HandleFunc("/metric/metric_metadata", am.ViewAccess(aH.getMetricMetadata)).Methods(http.MethodGet) } func (aH *APIHandler) Respond(w http.ResponseWriter, data interface{}) { @@ -537,7 +545,7 @@ func (aH *APIHandler) addTemporality(ctx context.Context, qp *v3.QueryRangeParam metricNameToTemporality := make(map[string]map[v3.Temporality]bool) if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 { for _, query := range qp.CompositeQuery.BuilderQueries { - if query.DataSource == v3.DataSourceMetrics { + if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" { metricNames = append(metricNames, query.AggregateAttribute.Key) if _, ok := metricNameToTemporality[query.AggregateAttribute.Key]; !ok { metricNameToTemporality[query.AggregateAttribute.Key] = make(map[v3.Temporality]bool) @@ -782,6 +790,58 @@ func (aH *APIHandler) QueryRangeMetricsV2(w http.ResponseWriter, r *http.Request aH.Respond(w, resp) } +// populateTemporality same as addTemporality but for v4 and better +func (aH *APIHandler) populateTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error { + + missingTemporality := make([]string, 0) + metricNameToTemporality := make(map[string]map[v3.Temporality]bool) + if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 { + for _, query := range qp.CompositeQuery.BuilderQueries { + // if there is no temporality specified in the query but we have it in the map + // then use the value from the map + if query.Temporality == "" && aH.temporalityMap[query.AggregateAttribute.Key] != nil { + // We prefer delta if it is available + if aH.temporalityMap[query.AggregateAttribute.Key][v3.Delta] { + query.Temporality = v3.Delta + } else if aH.temporalityMap[query.AggregateAttribute.Key][v3.Cumulative] { + query.Temporality = v3.Cumulative + } else { + query.Temporality = v3.Unspecified + } + } + // we don't have temporality for this metric + if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" { + missingTemporality = append(missingTemporality, query.AggregateAttribute.Key) + } + if _, ok := metricNameToTemporality[query.AggregateAttribute.Key]; !ok { + metricNameToTemporality[query.AggregateAttribute.Key] = make(map[v3.Temporality]bool) + } + } + } + + nameToTemporality, err := aH.reader.FetchTemporality(ctx, missingTemporality) + if err != nil { + return err + } + + if qp.CompositeQuery != nil && len(qp.CompositeQuery.BuilderQueries) > 0 { + for name := range qp.CompositeQuery.BuilderQueries { + query := qp.CompositeQuery.BuilderQueries[name] + if query.DataSource == v3.DataSourceMetrics && query.Temporality == "" { + if nameToTemporality[query.AggregateAttribute.Key][v3.Delta] { + query.Temporality = v3.Delta + } else if nameToTemporality[query.AggregateAttribute.Key][v3.Cumulative] { + query.Temporality = v3.Cumulative + } else { + query.Temporality = v3.Unspecified + } + aH.temporalityMap[query.AggregateAttribute.Key] = nameToTemporality[query.AggregateAttribute.Key] + } + } + } + return nil +} + func (aH *APIHandler) listRules(w http.ResponseWriter, r *http.Request) { rules, err := aH.ruleManager.ListRuleStates(r.Context()) @@ -3179,7 +3239,7 @@ func (aH *APIHandler) liveTailLogs(w http.ResponseWriter, r *http.Request) { zap.S().Debug("done!") return case err := <-client.Error: - zap.S().Error("error occured!", err) + zap.S().Error("error occurred!", err) fmt.Fprintf(w, "event: error\ndata: %v\n\n", err.Error()) flusher.Flush() return @@ -3187,6 +3247,18 @@ func (aH *APIHandler) liveTailLogs(w http.ResponseWriter, r *http.Request) { } } +func (aH *APIHandler) getMetricMetadata(w http.ResponseWriter, r *http.Request) { + metricName := r.URL.Query().Get("metricName") + serviceName := r.URL.Query().Get("serviceName") + metricMetadata, err := aH.reader.GetMetricMetadata(r.Context(), metricName, serviceName) + if err != nil { + RespondError(w, &model.ApiError{Err: err, Typ: model.ErrorInternal}, nil) + return + } + + aH.WriteJSON(w, r, metricMetadata) +} + func (aH *APIHandler) queryRangeV4(ctx context.Context, queryRangeParams *v3.QueryRangeParamsV3, w http.ResponseWriter, r *http.Request) { var result []*v3.Result @@ -3250,8 +3322,7 @@ func (aH *APIHandler) QueryRangeV4(w http.ResponseWriter, r *http.Request) { } // add temporality for each metric - - temporalityErr := aH.addTemporality(r.Context(), queryRangeParams) + temporalityErr := aH.populateTemporality(r.Context(), queryRangeParams) if temporalityErr != nil { zap.S().Errorf("Error while adding temporality for metrics: %v", temporalityErr) RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: temporalityErr}, nil) @@ -3305,9 +3376,24 @@ func postProcessResult(result []*v3.Result, queryRangeParams *v3.QueryRangeParam zap.S().Errorf("error in expression: %s", err.Error()) return nil, err } + formulaResult.QueryName = query.QueryName result = append(result, formulaResult) } } + // we are done with the formula calculations, only send the results for enabled queries + removeDisabledQueries := func(result []*v3.Result) []*v3.Result { + var newResult []*v3.Result + for _, res := range result { + if queryRangeParams.CompositeQuery.BuilderQueries[res.QueryName].Disabled { + continue + } + newResult = append(newResult, res) + } + return newResult + } + if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder { + result = removeDisabledQueries(result) + } return result, nil } diff --git a/pkg/query-service/app/metrics/v4/cumulative/table_test.go b/pkg/query-service/app/metrics/v4/cumulative/table_test.go index d562b5d93a..ebdaa51182 100644 --- a/pkg/query-service/app/metrics/v4/cumulative/table_test.go +++ b/pkg/query-service/app/metrics/v4/cumulative/table_test.go @@ -51,7 +51,7 @@ func TestPrepareTableQuery(t *testing.T) { }, start: 1701794980000, end: 1701796780000, - expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'system_memory_usage' AND temporality = 'Unspecified' AND JSONExtractString(labels, 'state') != 'idle') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC", + expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'system_memory_usage' AND temporality = 'Unspecified' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND JSONExtractString(labels, 'state') != 'idle') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC", }, { name: "test time aggregation = rate, space aggregation = sum, temporality = cumulative", @@ -93,7 +93,7 @@ func TestPrepareTableQuery(t *testing.T) { }, start: 1701794980000, end: 1701796780000, - expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC", + expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC", }, } diff --git a/pkg/query-service/app/metrics/v4/cumulative/timeseries.go b/pkg/query-service/app/metrics/v4/cumulative/timeseries.go index 7dfa8fef87..9845096223 100644 --- a/pkg/query-service/app/metrics/v4/cumulative/timeseries.go +++ b/pkg/query-service/app/metrics/v4/cumulative/timeseries.go @@ -107,19 +107,19 @@ const ( func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery) (string, error) { var subQuery string - timeSeriesSubQuery, err := helpers.PrepareTimeseriesFilterQuery(mq) + timeSeriesSubQuery, err := helpers.PrepareTimeseriesFilterQuery(start, end, mq) if err != nil { return "", err } - samplesTableFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) + samplesTableFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli < %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) // Select the aggregate value for interval queryTmpl := "SELECT fingerprint, %s" + - " toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," + + " toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," + " %s as per_series_value" + - " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME + " INNER JOIN" + " (%s) as filtered_time_series" + " USING fingerprint" + diff --git a/pkg/query-service/app/metrics/v4/cumulative/timeseries_test.go b/pkg/query-service/app/metrics/v4/cumulative/timeseries_test.go index 91dd1c4a1e..9e31d0f0e7 100644 --- a/pkg/query-service/app/metrics/v4/cumulative/timeseries_test.go +++ b/pkg/query-service/app/metrics/v4/cumulative/timeseries_test.go @@ -66,7 +66,7 @@ func TestPrepareTimeAggregationSubQuery(t *testing.T) { }, start: 1701794980000, end: 1701796780000, - expectedQueryContains: "SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND JSONExtractString(labels, 'service_name') != 'payment_service' AND JSONExtractString(labels, 'endpoint') IN ['/paycallback','/payme','/paypal']) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts", + expectedQueryContains: "SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND JSONExtractString(labels, 'service_name') != 'payment_service' AND JSONExtractString(labels, 'endpoint') IN ['/paycallback','/payme','/paypal']) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts", }, { name: "test time aggregation = rate, temporality = cumulative", @@ -107,7 +107,7 @@ func TestPrepareTimeAggregationSubQuery(t *testing.T) { }, start: 1701794980000, end: 1701796780000, - expectedQueryContains: "SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)", + expectedQueryContains: "SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)", }, } @@ -168,7 +168,7 @@ func TestPrepareTimeseriesQuery(t *testing.T) { }, start: 1701794980000, end: 1701796780000, - expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'system_memory_usage' AND temporality = 'Unspecified' AND JSONExtractString(labels, 'state') != 'idle') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC", + expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'system_memory_usage' AND temporality = 'Unspecified' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND JSONExtractString(labels, 'state') != 'idle') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC", }, { name: "test time aggregation = rate, space aggregation = sum, temporality = cumulative", @@ -210,7 +210,7 @@ func TestPrepareTimeseriesQuery(t *testing.T) { }, start: 1701794980000, end: 1701796780000, - expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC", + expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC", }, } diff --git a/pkg/query-service/app/metrics/v4/delta/table_test.go b/pkg/query-service/app/metrics/v4/delta/table_test.go index c7bce4268c..54a35e40e4 100644 --- a/pkg/query-service/app/metrics/v4/delta/table_test.go +++ b/pkg/query-service/app/metrics/v4/delta/table_test.go @@ -53,7 +53,7 @@ func TestPrepareTableQuery(t *testing.T) { }, start: 1701794980000, end: 1701796780000, - expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'system_memory_usage' AND temporality = 'Unspecified' AND JSONExtractString(labels, 'state') != 'idle') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC", + expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'system_memory_usage' AND temporality = 'Unspecified' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND JSONExtractString(labels, 'state') != 'idle') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC", }, { name: "test time aggregation = rate, space aggregation = sum, temporality = delta", @@ -95,7 +95,7 @@ func TestPrepareTableQuery(t *testing.T) { }, start: 1701794980000, end: 1701796780000, - expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC", + expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC", }, } diff --git a/pkg/query-service/app/metrics/v4/delta/time_series_test.go b/pkg/query-service/app/metrics/v4/delta/time_series_test.go index 024371d328..0af2c91154 100644 --- a/pkg/query-service/app/metrics/v4/delta/time_series_test.go +++ b/pkg/query-service/app/metrics/v4/delta/time_series_test.go @@ -66,7 +66,7 @@ func TestPrepareTimeAggregationSubQuery(t *testing.T) { }, start: 1701794980000, end: 1701796780000, - expectedQueryContains: "SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND JSONExtractString(labels, 'service_name') != 'payment_service' AND JSONExtractString(labels, 'endpoint') IN ['/paycallback','/payme','/paypal']) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts", + expectedQueryContains: "SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND JSONExtractString(labels, 'service_name') != 'payment_service' AND JSONExtractString(labels, 'endpoint') IN ['/paycallback','/payme','/paypal']) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts", }, { name: "test time aggregation = rate, temporality = delta", @@ -107,7 +107,7 @@ func TestPrepareTimeAggregationSubQuery(t *testing.T) { }, start: 1701794980000, end: 1701796780000, - expectedQueryContains: "SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts", + expectedQueryContains: "SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts", }, } @@ -168,7 +168,7 @@ func TestPrepareTimeseriesQuery(t *testing.T) { }, start: 1701794980000, end: 1701796780000, - expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'system_memory_usage' AND temporality = 'Unspecified' AND JSONExtractString(labels, 'state') != 'idle') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC", + expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'system_memory_usage' AND temporality = 'Unspecified' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND JSONExtractString(labels, 'state') != 'idle') as filtered_time_series USING fingerprint WHERE metric_name = 'system_memory_usage' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC", }, { name: "test time aggregation = rate, space aggregation = sum, temporality = delta", @@ -210,7 +210,7 @@ func TestPrepareTimeseriesQuery(t *testing.T) { }, start: 1701794980000, end: 1701796780000, - expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND timestamp_ms >= 1701794980000 AND timestamp_ms <= 1701796780000 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC", + expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND unix_milli >= 1701792000000 AND unix_milli < 1701796780000 AND like(JSONExtractString(labels, 'service_name'), '%payment_service%')) as filtered_time_series USING fingerprint WHERE metric_name = 'http_requests' AND unix_milli >= 1701794980000 AND unix_milli < 1701796780000 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC", }, } diff --git a/pkg/query-service/app/metrics/v4/delta/timeseries.go b/pkg/query-service/app/metrics/v4/delta/timeseries.go index 3d6999f425..03781dfcd1 100644 --- a/pkg/query-service/app/metrics/v4/delta/timeseries.go +++ b/pkg/query-service/app/metrics/v4/delta/timeseries.go @@ -14,19 +14,19 @@ func prepareTimeAggregationSubQuery(start, end, step int64, mq *v3.BuilderQuery) var subQuery string - timeSeriesSubQuery, err := helpers.PrepareTimeseriesFilterQuery(mq) + timeSeriesSubQuery, err := helpers.PrepareTimeseriesFilterQuery(start, end, mq) if err != nil { return "", err } - samplesTableFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) + samplesTableFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli < %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) // Select the aggregate value for interval queryTmpl := "SELECT fingerprint, %s" + - " toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," + + " toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," + " %s as per_series_value" + - " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME + " INNER JOIN" + " (%s) as filtered_time_series" + " USING fingerprint" + @@ -77,19 +77,19 @@ func prepareQueryOptimized(start, end, step int64, mq *v3.BuilderQuery) (string, var query string - timeSeriesSubQuery, err := helpers.PrepareTimeseriesFilterQuery(mq) + timeSeriesSubQuery, err := helpers.PrepareTimeseriesFilterQuery(start, end, mq) if err != nil { return "", err } - samplesTableFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) + samplesTableFilter := fmt.Sprintf("metric_name = %s AND unix_milli >= %d AND unix_milli < %d", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key), start, end) // Select the aggregate value for interval queryTmpl := "SELECT %s" + - " toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," + + " toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL %d SECOND) as ts," + " %s as value" + - " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME + + " FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_V4_TABLENAME + " INNER JOIN" + " (%s) as filtered_time_series" + " USING fingerprint" + diff --git a/pkg/query-service/app/metrics/v4/helpers/clauses.go b/pkg/query-service/app/metrics/v4/helpers/clauses.go index 06f4b13cea..8714df51da 100644 --- a/pkg/query-service/app/metrics/v4/helpers/clauses.go +++ b/pkg/query-service/app/metrics/v4/helpers/clauses.go @@ -37,6 +37,17 @@ func GroupByAttributeKeyTags(tags ...v3.AttributeKey) string { return strings.Join(groupTags, ", ") } +func GroupByAttributeKeyTagsWithoutLe(tags ...v3.AttributeKey) string { + groupTags := []string{} + for _, tag := range tags { + if tag.Key != "le" { + groupTags = append(groupTags, tag.Key) + } + } + groupTags = append(groupTags, "ts") + return strings.Join(groupTags, ", ") +} + // OrderByAttributeKeyTags returns a string of comma separated tags for order by clause // if the order is not specified, it defaults to ASC func OrderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string { @@ -60,6 +71,29 @@ func OrderByAttributeKeyTags(items []v3.OrderBy, tags []v3.AttributeKey) string return strings.Join(orderBy, ", ") } +func OrderByAttributeKeyTagsWithoutLe(items []v3.OrderBy, tags []v3.AttributeKey) string { + var orderBy []string + for _, tag := range tags { + if tag.Key != "le" { + found := false + for _, item := range items { + if item.ColumnName == tag.Key { + found = true + orderBy = append(orderBy, fmt.Sprintf("%s %s", item.ColumnName, item.Order)) + break + } + } + if !found { + orderBy = append(orderBy, fmt.Sprintf("%s ASC", tag.Key)) + } + } + } + + orderBy = append(orderBy, "ts ASC") + + return strings.Join(orderBy, ", ") +} + func SelectLabelsAny(tags []v3.AttributeKey) string { var selectLabelsAny []string for _, tag := range tags { diff --git a/pkg/query-service/app/metrics/v4/helpers/sub_query.go b/pkg/query-service/app/metrics/v4/helpers/sub_query.go index 97176e54bd..d4cd103719 100644 --- a/pkg/query-service/app/metrics/v4/helpers/sub_query.go +++ b/pkg/query-service/app/metrics/v4/helpers/sub_query.go @@ -3,14 +3,43 @@ package helpers import ( "fmt" "strings" + "time" "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/utils" ) +var ( + sixHoursInMilliseconds = time.Hour.Milliseconds() * 6 + oneDayInMilliseconds = time.Hour.Milliseconds() * 24 +) + +// start and end are in milliseconds +func which(start, end int64) (int64, int64, string) { + // If time range is less than 6 hours, we need to use the `time_series_v4` table + // else if time range is less than 1 day and greater than 6 hours, we need to use the `time_series_v4_6hrs` table + // else we need to use the `time_series_v4_1day` table + var tableName string + if end-start <= sixHoursInMilliseconds { + // adjust the start time to nearest 1 hour + start = start - (start % (time.Hour.Milliseconds() * 1)) + tableName = constants.SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME + } else if end-start <= oneDayInMilliseconds { + // adjust the start time to nearest 6 hours + start = start - (start % (time.Hour.Milliseconds() * 6)) + tableName = constants.SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME + } else { + // adjust the start time to nearest 1 day + start = start - (start % (time.Hour.Milliseconds() * 24)) + tableName = constants.SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME + } + + return start, end, tableName +} + // PrepareTimeseriesFilterQuery builds the sub-query to be used for filtering timeseries based on the search criteria -func PrepareTimeseriesFilterQuery(mq *v3.BuilderQuery) (string, error) { +func PrepareTimeseriesFilterQuery(start, end int64, mq *v3.BuilderQuery) (string, error) { var conditions []string var fs *v3.FilterSet = mq.Filters var groupTags []v3.AttributeKey = mq.GroupBy @@ -18,6 +47,10 @@ func PrepareTimeseriesFilterQuery(mq *v3.BuilderQuery) (string, error) { conditions = append(conditions, fmt.Sprintf("metric_name = %s", utils.ClickHouseFormattedValue(mq.AggregateAttribute.Key))) conditions = append(conditions, fmt.Sprintf("temporality = '%s'", mq.Temporality)) + start, end, tableName := which(start, end) + + conditions = append(conditions, fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", start, end)) + if fs != nil && len(fs.Items) != 0 { for _, item := range fs.Items { toFormat := item.Value @@ -78,7 +111,7 @@ func PrepareTimeseriesFilterQuery(mq *v3.BuilderQuery) (string, error) { "SELECT DISTINCT %s FROM %s.%s WHERE %s", selectLabels, constants.SIGNOZ_METRIC_DBNAME, - constants.SIGNOZ_TIMESERIES_LOCAL_TABLENAME, + tableName, whereClause, ) diff --git a/pkg/query-service/app/metrics/v4/query_builder.go b/pkg/query-service/app/metrics/v4/query_builder.go index 6543d6483b..ae9ee9b69a 100644 --- a/pkg/query-service/app/metrics/v4/query_builder.go +++ b/pkg/query-service/app/metrics/v4/query_builder.go @@ -21,8 +21,10 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P start, end = common.AdjustedMetricTimeRange(start, end, mq.StepInterval, mq.TimeAggregation) - groupBy := helpers.GroupByAttributeKeyTags(mq.GroupBy...) - orderBy := helpers.OrderByAttributeKeyTags(mq.OrderBy, mq.GroupBy) + if mq.ShiftBy != 0 { + start = start - mq.ShiftBy*1000 + end = end - mq.ShiftBy*1000 + } var quantile float64 @@ -33,11 +35,21 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P // and time aggregation to rate mq.TimeAggregation = v3.TimeAggregationRate mq.SpaceAggregation = v3.SpaceAggregationSum - mq.GroupBy = append(mq.GroupBy, v3.AttributeKey{ - Key: "le", - Type: v3.AttributeKeyTypeTag, - DataType: v3.AttributeKeyDataTypeString, - }) + // If le is not present in group by for quantile, add it + leFound := false + for _, groupBy := range mq.GroupBy { + if groupBy.Key == "le" { + leFound = true + break + } + } + if !leFound { + mq.GroupBy = append(mq.GroupBy, v3.AttributeKey{ + Key: "le", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + }) + } } var query string @@ -60,6 +72,15 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P return "", err } + groupByWithoutLe := []v3.AttributeKey{} + for _, groupBy := range mq.GroupBy { + if groupBy.Key != "le" { + groupByWithoutLe = append(groupByWithoutLe, groupBy) + } + } + groupBy := helpers.GroupByAttributeKeyTags(groupByWithoutLe...) + orderBy := helpers.OrderByAttributeKeyTags(mq.OrderBy, groupByWithoutLe) + if quantile != 0 { query = fmt.Sprintf(`SELECT %s, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s`, groupBy, quantile, query, groupBy, orderBy) } diff --git a/pkg/query-service/app/metrics/v4/query_builder_test.go b/pkg/query-service/app/metrics/v4/query_builder_test.go index b132b42ca0..15e948520c 100644 --- a/pkg/query-service/app/metrics/v4/query_builder_test.go +++ b/pkg/query-service/app/metrics/v4/query_builder_test.go @@ -33,7 +33,7 @@ func TestPrepareTimeseriesFilterQuery(t *testing.T) { Disabled: false, // remaining struct fields are not needed here }, - expectedQueryContains: "SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Delta'", + expectedQueryContains: "SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Delta' AND unix_milli >= 1706428800000 AND unix_milli < 1706434026000", }, { name: "test prepare time series with no filters and group by", @@ -58,7 +58,7 @@ func TestPrepareTimeseriesFilterQuery(t *testing.T) { Disabled: false, // remaining struct fields are not needed here }, - expectedQueryContains: "SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative'", + expectedQueryContains: "SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND unix_milli >= 1706428800000 AND unix_milli < 1706434026000", }, { name: "test prepare time series with no filters and multiple group by", @@ -90,7 +90,7 @@ func TestPrepareTimeseriesFilterQuery(t *testing.T) { Disabled: false, // remaining struct fields are not needed here }, - expectedQueryContains: "SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'endpoint') as endpoint, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative'", + expectedQueryContains: "SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'endpoint') as endpoint, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND unix_milli >= 1706428800000 AND unix_milli < 1706434026000", }, { name: "test prepare time series with filters and multiple group by", @@ -138,13 +138,15 @@ func TestPrepareTimeseriesFilterQuery(t *testing.T) { Disabled: false, // remaining struct fields are not needed here }, - expectedQueryContains: "SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND JSONExtractString(labels, 'service_name') != 'payment_service' AND JSONExtractString(labels, 'endpoint') IN ['/paycallback','/payme','/paypal']", + expectedQueryContains: "SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4 WHERE metric_name = 'http_requests' AND temporality = 'Cumulative' AND unix_milli >= 1706428800000 AND unix_milli < 1706434026000 AND JSONExtractString(labels, 'service_name') != 'payment_service' AND JSONExtractString(labels, 'endpoint') IN ['/paycallback','/payme','/paypal']", }, } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { - query, err := helpers.PrepareTimeseriesFilterQuery(testCase.builderQuery) + // 1706432226000 - 2:27:06 PM (IST) + // 1706434026000 - 2:57:06 PM (IST) + query, err := helpers.PrepareTimeseriesFilterQuery(1706432226000, 1706434026000, testCase.builderQuery) assert.Nil(t, err) assert.Contains(t, query, testCase.expectedQueryContains) }) @@ -191,7 +193,7 @@ func TestPrepareMetricQueryCumulativeRate(t *testing.T) { TimeAggregation: v3.TimeAggregationRate, SpaceAggregation: v3.SpaceAggregationSum, }, - expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_calls_total' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND timestamp_ms >= 1650991920000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC", + expectedQueryContains: "SELECT service_name, ts, sum(per_series_value) as value FROM (SELECT service_name, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991920000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC", }, { name: "test time aggregation = rate, space aggregation = sum, temporality = cumulative, multiple group by", @@ -224,12 +226,14 @@ func TestPrepareMetricQueryCumulativeRate(t *testing.T) { TimeAggregation: v3.TimeAggregationRate, SpaceAggregation: v3.SpaceAggregationSum, }, - expectedQueryContains: "SELECT service_name, endpoint, ts, sum(per_series_value) as value FROM (SELECT service_name, endpoint, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(endpoint) as endpoint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'endpoint') as endpoint, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_calls_total' AND temporality = 'Cumulative') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND timestamp_ms >= 1650991920000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, endpoint, ts), (service_name, endpoint) ) ORDER BY service_name ASC, endpoint ASC, ts ASC", + expectedQueryContains: "SELECT service_name, endpoint, ts, sum(per_series_value) as value FROM (SELECT service_name, endpoint, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(endpoint) as endpoint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'endpoint') as endpoint, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Cumulative' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991920000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, endpoint, ts), (service_name, endpoint) ) ORDER BY service_name ASC, endpoint ASC, ts ASC", }, } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { + // 1650991982000 - April 26, 2022 10:23:02 PM + // 1651078382000 - April 27, 2022 10:23:02 PM query, err := PrepareMetricQuery(1650991982000, 1651078382000, v3.QueryTypeBuilder, v3.PanelTypeGraph, testCase.builderQuery, metricsV3.Options{}) assert.Nil(t, err) assert.Contains(t, query, testCase.expectedQueryContains) @@ -262,7 +266,7 @@ func TestPrepareMetricQueryDeltaRate(t *testing.T) { TimeAggregation: v3.TimeAggregationRate, SpaceAggregation: v3.SpaceAggregationSum, }, - expectedQueryContains: "SELECT toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND timestamp_ms >= 1650991920000 AND timestamp_ms <= 1651078380000 GROUP BY ts ORDER BY ts ASC", + expectedQueryContains: "SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991920000 AND unix_milli < 1651078380000 GROUP BY ts ORDER BY ts ASC", }, { name: "test time aggregation = rate, space aggregation = sum, temporality = delta, group by service_name", @@ -288,7 +292,7 @@ func TestPrepareMetricQueryDeltaRate(t *testing.T) { TimeAggregation: v3.TimeAggregationRate, SpaceAggregation: v3.SpaceAggregationSum, }, - expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta') as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND timestamp_ms >= 1650991920000 AND timestamp_ms <= 1651078380000 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC", + expectedQueryContains: "SELECT service_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, fingerprint FROM signoz_metrics.time_series_v4_1day WHERE metric_name = 'signoz_calls_total' AND temporality = 'Delta' AND unix_milli >= 1650931200000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_calls_total' AND unix_milli >= 1650991920000 AND unix_milli < 1651078380000 GROUP BY GROUPING SETS ( (service_name, ts), (service_name) ) ORDER BY service_name ASC, ts ASC", }, } @@ -340,7 +344,7 @@ func TestPrepreMetricQueryCumulativeQuantile(t *testing.T) { Disabled: false, SpaceAggregation: v3.SpaceAggregationPercentile99, }, - expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, ts, sum(per_series_value) as value FROM (SELECT service_name, le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(le) as le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, le, ts), (service_name, le) ) ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC", + expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, ts, sum(per_series_value) as value FROM (SELECT service_name, le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(service_name) as service_name, any(le) as le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (service_name, le, ts), (service_name, le) ) ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC", }, { name: "test temporality = cumulative, quantile = 0.99 without group by", @@ -370,7 +374,7 @@ func TestPrepreMetricQueryCumulativeQuantile(t *testing.T) { Disabled: false, SpaceAggregation: v3.SpaceAggregationPercentile99, }, - expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, ts, sum(per_series_value) as value FROM (SELECT le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(le) as le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (le, ts), (le) ) ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC", + expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, ts, sum(per_series_value) as value FROM (SELECT le, ts, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, nan, If((ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window) >= 86400, nan, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDate('1970-01-01')) OVER rate_window))) as per_series_value FROM (SELECT fingerprint, any(le) as le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, max(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Cumulative' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WINDOW rate_window as (PARTITION BY fingerprint ORDER BY fingerprint, ts)) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (le, ts), (le) ) ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC", }, } @@ -422,7 +426,7 @@ func TestPrepreMetricQueryDeltaQuantile(t *testing.T) { Disabled: false, SpaceAggregation: v3.SpaceAggregationPercentile99, }, - expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY GROUPING SETS ( (service_name, le, ts), (service_name, le) ) ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC", + expectedQueryContains: "SELECT service_name, ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT service_name, le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'service_name') as service_name, JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY GROUPING SETS ( (service_name, le, ts), (service_name, le) ) ORDER BY service_name ASC, le ASC, ts ASC) GROUP BY service_name, ts ORDER BY service_name ASC, ts ASC", }, { name: "test temporality = delta, quantile = 0.99 no group by", @@ -452,7 +456,7 @@ func TestPrepreMetricQueryDeltaQuantile(t *testing.T) { Disabled: false, SpaceAggregation: v3.SpaceAggregationPercentile99, }, - expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY GROUPING SETS ( (le, ts), (le) ) ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC", + expectedQueryContains: "SELECT ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.990) as value FROM (SELECT le, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, sum(value)/60 as value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'le') as le, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'signoz_latency_bucket' AND temporality = 'Delta' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000 AND like(JSONExtractString(labels, 'service_name'), '%frontend%')) as filtered_time_series USING fingerprint WHERE metric_name = 'signoz_latency_bucket' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY GROUPING SETS ( (le, ts), (le) ) ORDER BY le ASC, ts ASC) GROUP BY ts ORDER BY ts ASC", }, } @@ -490,7 +494,7 @@ func TestPrepareMetricQueryGauge(t *testing.T) { SpaceAggregation: v3.SpaceAggregationSum, Disabled: false, }, - expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified') as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC", + expectedQueryContains: "SELECT ts, sum(per_series_value) as value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY ts ORDER BY ts ASC", }, { name: "test gauge query with group by host_name", @@ -516,7 +520,7 @@ func TestPrepareMetricQueryGauge(t *testing.T) { Expression: "A", Disabled: false, }, - expectedQueryContains: "SELECT host_name, ts, sum(per_series_value) as value FROM (SELECT fingerprint, any(host_name) as host_name, toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v2 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'host_name') as host_name, fingerprint FROM signoz_metrics.time_series_v2 WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified') as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND timestamp_ms >= 1650991980000 AND timestamp_ms <= 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (host_name, ts), (host_name) ) ORDER BY host_name ASC, ts ASC", + expectedQueryContains: "SELECT host_name, ts, sum(per_series_value) as value FROM (SELECT fingerprint, any(host_name) as host_name, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), INTERVAL 60 SECOND) as ts, avg(value) as per_series_value FROM signoz_metrics.distributed_samples_v4 INNER JOIN (SELECT DISTINCT JSONExtractString(labels, 'host_name') as host_name, fingerprint FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name = 'system_cpu_usage' AND temporality = 'Unspecified' AND unix_milli >= 1650974400000 AND unix_milli < 1651078380000) as filtered_time_series USING fingerprint WHERE metric_name = 'system_cpu_usage' AND unix_milli >= 1650991980000 AND unix_milli < 1651078380000 GROUP BY fingerprint, ts ORDER BY fingerprint, ts) WHERE isNaN(per_series_value) = 0 GROUP BY GROUPING SETS ( (host_name, ts), (host_name) ) ORDER BY host_name ASC, ts ASC", }, } diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index 36d2f9bc1f..ad2a9fd8de 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -935,11 +935,10 @@ func validateExpressions(expressions []string, funcs map[string]govaluate.Expres for _, exp := range expressions { evalExp, err := govaluate.NewEvaluableExpressionWithFunctions(exp, funcs) if err != nil { - errs = append(errs, err) + errs = append(errs, fmt.Errorf("invalid expression %s: %v", exp, err)) continue } - variables := evalExp.Vars() - for _, v := range variables { + for _, v := range evalExp.Vars() { var hasVariable bool for _, q := range cq.BuilderQueries { if q.Expression == v { @@ -961,7 +960,7 @@ func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiE // parse the request body if err := json.NewDecoder(r.Body).Decode(&queryRangeParams); err != nil { - return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err} + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)} } // validate the request body @@ -969,7 +968,7 @@ func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiE return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err} } - // prepare the variables for the corrspnding query type + // prepare the variables for the corresponding query type formattedVars := make(map[string]interface{}) for name, value := range queryRangeParams.Variables { if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypePromQL { @@ -1046,6 +1045,25 @@ func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiE } } } + + var timeShiftBy int64 + if len(query.Functions) > 0 { + for idx := range query.Functions { + function := &query.Functions[idx] + if function.Name == v3.FunctionNameTimeShift { + // move the function to the beginning of the list + // so any other function can use the shifted time + var fns []v3.Function + fns = append(fns, *function) + fns = append(fns, query.Functions[:idx]...) + fns = append(fns, query.Functions[idx+1:]...) + query.Functions = fns + timeShiftBy = int64(function.Args[0].(float64)) + break + } + } + } + query.ShiftBy = timeShiftBy } } queryRangeParams.Variables = formattedVars diff --git a/pkg/query-service/app/querier/v2/querier.go b/pkg/query-service/app/querier/v2/querier.go index 86a472c064..50f19b89b1 100644 --- a/pkg/query-service/app/querier/v2/querier.go +++ b/pkg/query-service/app/querier/v2/querier.go @@ -276,14 +276,9 @@ func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangePa var wg sync.WaitGroup for queryName, builderQuery := range params.CompositeQuery.BuilderQueries { - if builderQuery.Disabled { - continue - } - wg.Add(1) if queryName == builderQuery.Expression { + wg.Add(1) go q.runBuilderQuery(ctx, builderQuery, params, keys, cacheKeys, ch, &wg) - } else { - go q.runBuilderExpression(ctx, builderQuery, params, keys, cacheKeys, ch, &wg) } } diff --git a/pkg/query-service/app/queryBuilder/functions.go b/pkg/query-service/app/queryBuilder/functions.go index d71bfd0d54..a933406d23 100644 --- a/pkg/query-service/app/queryBuilder/functions.go +++ b/pkg/query-service/app/queryBuilder/functions.go @@ -281,6 +281,21 @@ func ApplyFunction(fn v3.Function, result *v3.Result) *v3.Result { return funcMedian5(result) case v3.FunctionNameMedian7: return funcMedian7(result) + case v3.FunctionNameTimeShift: + shift, ok := fn.Args[0].(float64) + if !ok { + return result + } + return funcTimeShift(result, shift) + } + return result +} + +func funcTimeShift(result *v3.Result, shift float64) *v3.Result { + for _, series := range result.Series { + for idx, point := range series.Points { + series.Points[idx].Timestamp = point.Timestamp + int64(shift)*1000 + } } return result } diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index 8c697c895b..e7a482d02f 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -203,12 +203,16 @@ var GroupByColMap = map[string]struct{}{ } const ( - SIGNOZ_METRIC_DBNAME = "signoz_metrics" - SIGNOZ_SAMPLES_TABLENAME = "distributed_samples_v2" - SIGNOZ_TIMESERIES_TABLENAME = "distributed_time_series_v2" - SIGNOZ_TRACE_DBNAME = "signoz_traces" - SIGNOZ_SPAN_INDEX_TABLENAME = "distributed_signoz_index_v2" - SIGNOZ_TIMESERIES_LOCAL_TABLENAME = "time_series_v2" + SIGNOZ_METRIC_DBNAME = "signoz_metrics" + SIGNOZ_SAMPLES_TABLENAME = "distributed_samples_v2" + SIGNOZ_SAMPLES_V4_TABLENAME = "distributed_samples_v4" + SIGNOZ_TIMESERIES_TABLENAME = "distributed_time_series_v2" + SIGNOZ_TRACE_DBNAME = "signoz_traces" + SIGNOZ_SPAN_INDEX_TABLENAME = "distributed_signoz_index_v2" + SIGNOZ_TIMESERIES_LOCAL_TABLENAME = "time_series_v2" + SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME = "time_series_v4" + SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME = "time_series_v4_6hrs" + SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME = "time_series_v4_1day" ) var TimeoutExcludedRoutes = map[string]bool{ diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index a75a2f5f30..e15b1db67e 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -99,6 +99,7 @@ type Reader interface { CheckClickHouse(ctx context.Context) error GetLatencyMetricMetadata(context.Context, string, string, bool) (*v3.LatencyMetricMetadataResponse, error) + GetMetricMetadata(context.Context, string, string) (*v3.MetricMetadataResponse, error) } type Querier interface { diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index a960797ff3..0fe9dcdde2 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -463,6 +463,23 @@ const ( TimeAggregationIncrease TimeAggregation = "increase" ) +func (t TimeAggregation) Validate() error { + switch t { + case TimeAggregationAnyLast, + TimeAggregationSum, + TimeAggregationAvg, + TimeAggregationMin, + TimeAggregationMax, + TimeAggregationCount, + TimeAggregationCountDistinct, + TimeAggregationRate, + TimeAggregationIncrease: + return nil + default: + return fmt.Errorf("invalid time aggregation: %s", t) + } +} + func (t TimeAggregation) IsRateOperator() bool { switch t { case TimeAggregationRate, TimeAggregationIncrease: @@ -488,6 +505,24 @@ const ( SpaceAggregationPercentile99 SpaceAggregation = "percentile_99" ) +func (s SpaceAggregation) Validate() error { + switch s { + case SpaceAggregationSum, + SpaceAggregationAvg, + SpaceAggregationMin, + SpaceAggregationMax, + SpaceAggregationCount, + SpaceAggregationPercentile50, + SpaceAggregationPercentile75, + SpaceAggregationPercentile90, + SpaceAggregationPercentile95, + SpaceAggregationPercentile99: + return nil + default: + return fmt.Errorf("invalid space aggregation: %s", s) + } +} + func IsPercentileOperator(operator SpaceAggregation) bool { switch operator { case SpaceAggregationPercentile50, @@ -536,6 +571,7 @@ const ( FunctionNameMedian3 FunctionName = "median3" FunctionNameMedian5 FunctionName = "median5" FunctionNameMedian7 FunctionName = "median7" + FunctionNameTimeShift FunctionName = "timeShift" ) func (f FunctionName) Validate() error { @@ -553,7 +589,8 @@ func (f FunctionName) Validate() error { FunctionNameEWMA7, FunctionNameMedian3, FunctionNameMedian5, - FunctionNameMedian7: + FunctionNameMedian7, + FunctionNameTimeShift: return nil default: return fmt.Errorf("invalid function name: %s", f) @@ -587,6 +624,7 @@ type BuilderQuery struct { TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"` SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"` Functions []Function `json:"functions,omitempty"` + ShiftBy int64 } func (b *BuilderQuery) Validate() error { @@ -604,10 +642,19 @@ func (b *BuilderQuery) Validate() error { return fmt.Errorf("data source is invalid: %w", err) } if b.DataSource == DataSourceMetrics { - if b.TimeAggregation == TimeAggregationUnspecified { + // if AggregateOperator is specified, then the request is using v3 payload + if b.AggregateOperator != "" { if err := b.AggregateOperator.Validate(); err != nil { return fmt.Errorf("aggregate operator is invalid: %w", err) } + } else { + if err := b.TimeAggregation.Validate(); err != nil { + return fmt.Errorf("time aggregation is invalid: %w", err) + } + + if err := b.SpaceAggregation.Validate(); err != nil { + return fmt.Errorf("space aggregation is invalid: %w", err) + } } } else { if err := b.AggregateOperator.Validate(); err != nil { @@ -661,6 +708,28 @@ func (b *BuilderQuery) Validate() error { if err := function.Name.Validate(); err != nil { return fmt.Errorf("function name is invalid: %w", err) } + if function.Name == FunctionNameTimeShift { + if len(function.Args) == 0 { + return fmt.Errorf("timeShiftBy param missing in query") + } + } else if function.Name == FunctionNameEWMA3 || + function.Name == FunctionNameEWMA5 || + function.Name == FunctionNameEWMA7 { + if len(function.Args) == 0 { + return fmt.Errorf("alpha param missing in query") + } + alpha := function.Args[0].(float64) + if alpha < 0 || alpha > 1 { + return fmt.Errorf("alpha param should be between 0 and 1") + } + } else if function.Name == FunctionNameCutOffMax || + function.Name == FunctionNameCutOffMin || + function.Name == FunctionNameClampMax || + function.Name == FunctionNameClampMin { + if len(function.Args) == 0 { + return fmt.Errorf("threshold param missing in query") + } + } } } @@ -919,3 +988,13 @@ type LatencyMetricMetadataResponse struct { Delta bool `json:"delta"` Le []float64 `json:"le"` } + +type MetricMetadataResponse struct { + Delta bool `json:"delta"` + Le []float64 `json:"le"` + Description string `json:"description"` + Unit string `json:"unit"` + Type string `json:"type"` + IsMonotonic bool `json:"isMonotonic"` + Temporality string `json:"temporality"` +}