fix(metrics-explorer): added updated metadata in list summary (#7381)

* fix(metrics-explorer): added updated metadata in list summary

* fix(metrics-explorer): added skipSignozMetric in aggregate attribute

* fix(metrics-explorer): updated last received query
This commit is contained in:
aniketio-ctrl 2025-03-20 18:46:05 +05:30 committed by GitHub
parent 9d8e46e5b2
commit f79a5a2db6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 140 additions and 88 deletions

View File

@ -2893,11 +2893,11 @@ func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNames []s
if cacheErr != nil {
zap.L().Info("Error in getting metrics cached metadata", zap.Error(cacheErr))
}
if updatedMetadata != nil {
if metadata, exist := updatedMetadata[metricName]; exist {
if _, exists := metricNameToTemporality[metricName]; !exists {
metricNameToTemporality[metricName] = make(map[v3.Temporality]bool)
}
metricNameToTemporality[metricName][updatedMetadata.Temporality] = true
metricNameToTemporality[metricName][metadata.Temporality] = true
} else {
metricNamesToQuery = append(metricNamesToQuery, metricName)
}
@ -3696,11 +3696,7 @@ func (r *ClickHouseReader) QueryDashboardVars(ctx context.Context, query string)
return &result, nil
}
func (r *ClickHouseReader) GetMetricAggregateAttributes(
ctx context.Context,
req *v3.AggregateAttributeRequest,
skipDotNames bool,
) (*v3.AggregateAttributeResponse, error) {
func (r *ClickHouseReader) GetMetricAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest, skipDotNames bool, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error) {
var query string
var err error
@ -3731,11 +3727,18 @@ func (r *ClickHouseReader) GetMetricAggregateAttributes(
continue
}
if skipSignozMetrics && strings.HasPrefix(metricName, "signoz_") {
continue
}
metadata, apiError := r.GetUpdatedMetricsMetadata(ctx, metricName)
if apiError == nil && metadata != nil {
typ = string(metadata.MetricType)
isMonotonic = metadata.IsMonotonic
temporality = string(metadata.Temporality)
if apiError != nil {
zap.L().Error("Error in getting metrics cached metadata", zap.Error(apiError))
}
if updatedMetadata, exist := metadata[metricName]; exist {
typ = string(updatedMetadata.MetricType)
isMonotonic = updatedMetadata.IsMonotonic
temporality = string(updatedMetadata.Temporality)
}
// Non-monotonic cumulative sums are treated as gauges
@ -3768,7 +3771,7 @@ func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.F
var response v3.FilterAttributeKeyResponse
// skips the internal attributes i.e attributes starting with __
query = fmt.Sprintf("SELECT arrayJoin(tagKeys) AS distinctTagKey FROM (SELECT JSONExtractKeys(labels) AS tagKeys FROM %s.%s WHERE metric_name=$1 AND unix_milli >= $2 GROUP BY tagKeys) WHERE distinctTagKey ILIKE $3 AND distinctTagKey NOT LIKE '\\_\\_%%' GROUP BY distinctTagKey", signozMetricDBName, signozTSTableNameV41Day)
query = fmt.Sprintf("SELECT arrayJoin(tagKeys) AS distinctTagKey FROM (SELECT JSONExtractKeys(labels) AS tagKeys FROM %s.%s WHERE metric_name=$1 AND unix_milli >= $2 AND __normalized = true GROUP BY tagKeys) WHERE distinctTagKey ILIKE $3 AND distinctTagKey NOT LIKE '\\_\\_%%' GROUP BY distinctTagKey", signozMetricDBName, signozTSTableNameV41Day)
if req.Limit != 0 {
query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
}
@ -3859,15 +3862,21 @@ func (r *ClickHouseReader) GetMetricMetadata(ctx context.Context, metricName, se
}
}
metadata, apiError := r.GetUpdatedMetricsMetadata(ctx, metricName)
if apiError == nil && metadata != nil {
metricType = string(metadata.MetricType)
temporality = string(metadata.Temporality)
if apiError != nil {
zap.L().Error("Error in getting metric cached metadata", zap.Error(apiError))
}
if updatedMetadata, exist := metadata[metricName]; exist {
metricType = string(updatedMetadata.MetricType)
temporality = string(updatedMetadata.Temporality)
if temporality == string(v3.Delta) {
deltaExists = true
}
isMonotonic = metadata.IsMonotonic
if metadata.Description != "" {
description = metadata.Description
isMonotonic = updatedMetadata.IsMonotonic
if updatedMetadata.Description != "" {
description = updatedMetadata.Description
}
if updatedMetadata.Unit != "" {
unit = updatedMetadata.Unit
}
}
@ -5862,14 +5871,24 @@ func (r *ClickHouseReader) GetMetricsLastReceived(ctx context.Context, metricNam
MAX(unix_milli) AS last_received_time
FROM %s.%s
WHERE metric_name = ?
`, signozMetricDBName, signozSampleTableName)
`, signozMetricDBName, signozSamplesAgg30mLocalTableName)
var lastReceived int64
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
err := r.db.QueryRow(valueCtx, query, metricName).Scan(&lastReceived)
if err != nil {
return 0, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
return lastReceived, nil // Convert to uint64 before returning
query = fmt.Sprintf(`SELECT
MAX(unix_milli) AS last_received_time
FROM %s.%s
WHERE metric_name = ? and unix_milli > ?
`, signozMetricDBName, signozSampleTableName)
var finalLastReceived int64
err = r.db.QueryRow(valueCtx, query, metricName, lastReceived).Scan(&finalLastReceived)
if err != nil {
return 0, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
return finalLastReceived, nil // Convert to uint64 before returning
}
func (r *ClickHouseReader) GetTotalTimeSeriesForMetricName(ctx context.Context, metricName string) (uint64, *model.ApiError) {
@ -5890,11 +5909,11 @@ func (r *ClickHouseReader) GetAttributesForMetricName(ctx context.Context, metri
const baseQueryTemplate = `
SELECT
kv.1 AS key,
arrayMap(x -> trim(BOTH '"' FROM x), groupUniqArray(10000)(kv.2)) AS values,
arrayMap(x -> trim(BOTH '"' FROM x), groupUniqArray(1000)(kv.2)) AS values,
length(groupUniqArray(10000)(kv.2)) AS valueCount
FROM %s.%s
ARRAY JOIN arrayFilter(x -> NOT startsWith(x.1, '__'), JSONExtractKeysAndValuesRaw(labels)) AS kv
WHERE metric_name = ?`
WHERE metric_name = ? AND __normalized=true`
var args []interface{}
args = append(args, metricName)
@ -6022,7 +6041,6 @@ func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, req *metrics_
zap.L().Error("Error iterating over metric rows", zap.Error(err))
return &response, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
// If no metrics were found, return early.
if len(metricNames) == 0 {
return &response, nil
@ -6130,8 +6148,23 @@ func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, req *metrics_
return &response, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
//get updated metrics data
batch, apiError := r.GetUpdatedMetricsMetadata(ctx, metricNames...)
if apiError != nil {
zap.L().Error("Error in getting metrics cached metadata", zap.Error(apiError))
}
var filteredMetrics []metrics_explorer.MetricDetail
for i := range response.Metrics {
if updatedMetrics, exists := batch[response.Metrics[i].MetricName]; exists {
response.Metrics[i].MetricType = string(updatedMetrics.MetricType)
if updatedMetrics.Unit != "" {
response.Metrics[i].MetricUnit = updatedMetrics.Unit
}
if updatedMetrics.Description != "" {
response.Metrics[i].Description = updatedMetrics.Description
}
}
if samples, exists := samplesMap[response.Metrics[i].MetricName]; exists {
response.Metrics[i].Samples = samples
if lastReceived, exists := lastReceivedMap[response.Metrics[i].MetricName]; exists {
@ -6242,6 +6275,7 @@ func (r *ClickHouseReader) GetMetricsSamplesPercentage(ctx context.Context, req
FROM %s.%s AS ts
WHERE NOT startsWith(ts.metric_name, 'signoz_')
AND __normalized = true
AND unix_milli BETWEEN ? AND ?
%s
GROUP BY ts.metric_name
ORDER BY timeSeries DESC
@ -6250,7 +6284,7 @@ func (r *ClickHouseReader) GetMetricsSamplesPercentage(ctx context.Context, req
)
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
rows, err := r.db.Query(valueCtx, metricsQuery)
rows, err := r.db.Query(valueCtx, metricsQuery, start, end)
if err != nil {
zap.L().Error("Error executing metrics query", zap.Error(err))
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
@ -6853,53 +6887,6 @@ func (r *ClickHouseReader) CheckForLabelsInMetric(ctx context.Context, metricNam
return hasLE, nil
}
func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, metricName string) (*model.UpdateMetricsMetadata, *model.ApiError) {
metricsMetadata := new(model.UpdateMetricsMetadata)
cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metricName
// Try to get from cache first
retrieveStatus, err := r.cache.Retrieve(ctx, cacheKey, metricsMetadata, true)
if err == nil && retrieveStatus == cache.RetrieveStatusHit {
return metricsMetadata, nil
}
if err != nil {
zap.L().Info("Error retrieving metrics metadata from cache", zap.String("metric_name", metricName), zap.Error(err))
} else {
zap.L().Info("Cache miss for metrics metadata", zap.String("metric_name", metricName))
}
// Query from database if cache missed
query := fmt.Sprintf(`SELECT metric_name, type, description, temporality, is_monotonic, unit
FROM %s.%s
WHERE metric_name = ?;`, signozMetricDBName, signozUpdatedMetricsMetadataTable)
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
fmt.Printf("Executing Query: %q\n", query)
row := r.db.QueryRow(valueCtx, query, metricName)
err = row.Scan(
&metricsMetadata.MetricName,
&metricsMetadata.MetricType,
&metricsMetadata.Description,
&metricsMetadata.Temporality,
&metricsMetadata.IsMonotonic,
&metricsMetadata.Unit,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil // No data found
}
return nil, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error querying metrics metadata: %v", err)}
}
// Try caching the result, but don't return error if caching fails
if cacheErr := r.cache.Store(ctx, cacheKey, metricsMetadata, -1); cacheErr != nil {
zap.L().Error("Failed to store metrics metadata in cache", zap.String("metric_name", metricName), zap.Error(cacheErr))
}
return metricsMetadata, nil
}
func (r *ClickHouseReader) PreloadMetricsMetadata(ctx context.Context) []error {
var allMetricsMetadata []model.UpdateMetricsMetadata
var errorList []error
@ -6921,3 +6908,62 @@ func (r *ClickHouseReader) PreloadMetricsMetadata(ctx context.Context) []error {
return errorList
}
func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, metricNames ...string) (map[string]*model.UpdateMetricsMetadata, *model.ApiError) {
cachedMetadata := make(map[string]*model.UpdateMetricsMetadata)
var missingMetrics []string
// First, try retrieving each metric from cache.
for _, metricName := range metricNames {
metadata := new(model.UpdateMetricsMetadata)
cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metricName
retrieveStatus, err := r.cache.Retrieve(ctx, cacheKey, metadata, true)
if err == nil && retrieveStatus == cache.RetrieveStatusHit {
cachedMetadata[metricName] = metadata
} else {
if err != nil {
zap.L().Error("Error retrieving metrics metadata from cache", zap.String("metric_name", metricName), zap.Error(err))
}
missingMetrics = append(missingMetrics, metricName)
}
}
// If there are any metrics missing in the cache, query them from the database.
if len(missingMetrics) > 0 {
// Join the missing metric names; ensure proper quoting if needed.
metricList := "'" + strings.Join(metricNames, "', '") + "'"
query := fmt.Sprintf(`SELECT metric_name, type, description, temporality, is_monotonic, unit
FROM %s.%s
WHERE metric_name IN (%s);`, signozMetricDBName, signozUpdatedMetricsMetadataTable, metricList)
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
rows, err := r.db.Query(valueCtx, query)
if err != nil {
return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error querying metrics metadata: %v", err)}
}
defer rows.Close()
for rows.Next() {
metadata := new(model.UpdateMetricsMetadata)
if err := rows.Scan(
&metadata.MetricName,
&metadata.MetricType,
&metadata.Description,
&metadata.Temporality,
&metadata.IsMonotonic,
&metadata.Unit,
); err != nil {
return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error scanning metrics metadata: %v", err)}
}
// Cache the result for future requests.
cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metadata.MetricName
if cacheErr := r.cache.Store(ctx, cacheKey, metadata, -1); cacheErr != nil {
zap.L().Error("Failed to store metrics metadata in cache", zap.String("metric_name", metadata.MetricName), zap.Error(cacheErr))
}
cachedMetadata[metadata.MetricName] = metadata
}
}
return cachedMetadata, nil
}

View File

@ -4684,7 +4684,7 @@ func (aH *APIHandler) autocompleteAggregateAttributes(w http.ResponseWriter, r *
switch req.DataSource {
case v3.DataSourceMetrics:
response, err = aH.reader.GetMetricAggregateAttributes(r.Context(), req, true)
response, err = aH.reader.GetMetricAggregateAttributes(r.Context(), req, true, false)
case v3.DataSourceLogs:
response, err = aH.reader.GetLogAggregateAttributes(r.Context(), req)
case v3.DataSourceTraces:

View File

@ -54,7 +54,7 @@ func (receiver *SummaryService) FilterValues(ctx context.Context, params *metric
case "metric_name":
var filterValues []string
request := v3.AggregateAttributeRequest{DataSource: v3.DataSourceMetrics, SearchText: params.SearchText, Limit: params.Limit}
attributes, err := receiver.reader.GetMetricAggregateAttributes(ctx, &request, true)
attributes, err := receiver.reader.GetMetricAggregateAttributes(ctx, &request, true, true)
if err != nil {
return nil, model.InternalError(err)
}
@ -104,6 +104,8 @@ func (receiver *SummaryService) GetMetricsSummary(ctx context.Context, metricNam
metricDetailsDTO.Metadata.MetricType = metadata.Type
metricDetailsDTO.Metadata.Description = metadata.Description
metricDetailsDTO.Metadata.Unit = metadata.Unit
metricDetailsDTO.Metadata.Temporality = metadata.Temporality
metricDetailsDTO.Metadata.Monotonic = metadata.IsMonotonic
return nil
})

View File

@ -230,9 +230,12 @@ func (q *querier) runBuilderQuery(
if builderQuery.DataSource == v3.DataSourceMetrics && !q.testingMode {
metadata, apiError := q.reader.GetUpdatedMetricsMetadata(ctx, builderQuery.AggregateAttribute.Key)
if apiError == nil && metadata != nil {
builderQuery.AggregateAttribute.Type = v3.AttributeKeyType(metadata.MetricType)
builderQuery.Temporality = metadata.Temporality
if apiError != nil {
zap.L().Error("Error in getting metrics cached metadata", zap.Error(apiError))
}
if updatedMetadata, exist := metadata[builderQuery.AggregateAttribute.Key]; exist {
builderQuery.AggregateAttribute.Type = v3.AttributeKeyType(updatedMetadata.MetricType)
builderQuery.Temporality = updatedMetadata.Temporality
}
}

View File

@ -231,9 +231,12 @@ func (q *querier) runBuilderQuery(
if builderQuery.DataSource == v3.DataSourceMetrics && !q.testingMode {
metadata, apiError := q.reader.GetUpdatedMetricsMetadata(ctx, builderQuery.AggregateAttribute.Key)
if apiError == nil && metadata != nil {
builderQuery.AggregateAttribute.Type = v3.AttributeKeyType(metadata.MetricType)
builderQuery.Temporality = metadata.Temporality
if apiError != nil {
zap.L().Error("Error in getting metrics cached metadata", zap.Error(apiError))
}
if updatedMetadata, exist := metadata[builderQuery.AggregateAttribute.Key]; exist {
builderQuery.AggregateAttribute.Type = v3.AttributeKeyType(updatedMetadata.MetricType)
builderQuery.Temporality = updatedMetadata.Temporality
}
}

View File

@ -48,7 +48,7 @@ type Reader interface {
SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError)
FetchTemporality(ctx context.Context, metricNames []string) (map[string]map[v3.Temporality]bool, error)
GetMetricAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest, skipDotNames bool) (*v3.AggregateAttributeResponse, error)
GetMetricAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest, skipDotNames bool, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error)
GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error)
GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error)
@ -142,7 +142,7 @@ type Reader interface {
DeleteMetricsMetadata(ctx context.Context, metricName string) *model.ApiError
UpdateMetricsMetadata(ctx context.Context, req *model.UpdateMetricsMetadata) *model.ApiError
GetUpdatedMetricsMetadata(ctx context.Context, metricName string) (*model.UpdateMetricsMetadata, *model.ApiError)
GetUpdatedMetricsMetadata(ctx context.Context, metricNames ...string) (map[string]*model.UpdateMetricsMetadata, *model.ApiError)
CheckForLabelsInMetric(ctx context.Context, metricName string, labels []string) (bool, *model.ApiError)
}

View File

@ -65,6 +65,8 @@ type Metadata struct {
MetricType string `json:"metric_type"`
Description string `json:"description"`
Unit string `json:"unit"`
Temporality string `json:"temporality"`
Monotonic bool `json:"monotonic"`
}
// Alert represents individual alerts associated with the metric.

View File

@ -2,7 +2,7 @@ package rules
import (
"context"
"database/sql"
"fmt"
"go.signoz.io/signoz/pkg/cache"
"go.signoz.io/signoz/pkg/cache/memorycache"
"go.signoz.io/signoz/pkg/factory/factorytest"
@ -1227,10 +1227,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
for idx, c := range cases {
rows := cmock.NewRows(cols, c.values)
cacheCols := make([]cmock.ColumnType, 0)
mock.ExpectQueryRow(".*").WillReturnRow(cmock.NewRow(cacheCols, nil)).WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(".*").WillReturnError(fmt.Errorf("error"))
// We are testing the eval logic after the query is run
// so we don't care about the query string here
queryString := "SELECT any"
@ -1331,8 +1328,7 @@ func TestThresholdRuleNoData(t *testing.T) {
for idx, c := range cases {
rows := cmock.NewRows(cols, c.values)
cacheCols := make([]cmock.ColumnType, 0)
mock.ExpectQueryRow(".*").WillReturnRow(cmock.NewRow(cacheCols, nil)).WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(".*").WillReturnError(fmt.Errorf("error"))
// We are testing the eval logic after the query is run
// so we don't care about the query string here