diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index c748f50186..e253a3c44f 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -2893,11 +2893,11 @@ func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNames []s if cacheErr != nil { zap.L().Info("Error in getting metrics cached metadata", zap.Error(cacheErr)) } - if updatedMetadata != nil { + if metadata, exist := updatedMetadata[metricName]; exist { if _, exists := metricNameToTemporality[metricName]; !exists { metricNameToTemporality[metricName] = make(map[v3.Temporality]bool) } - metricNameToTemporality[metricName][updatedMetadata.Temporality] = true + metricNameToTemporality[metricName][metadata.Temporality] = true } else { metricNamesToQuery = append(metricNamesToQuery, metricName) } @@ -3696,11 +3696,7 @@ func (r *ClickHouseReader) QueryDashboardVars(ctx context.Context, query string) return &result, nil } -func (r *ClickHouseReader) GetMetricAggregateAttributes( - ctx context.Context, - req *v3.AggregateAttributeRequest, - skipDotNames bool, -) (*v3.AggregateAttributeResponse, error) { +func (r *ClickHouseReader) GetMetricAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest, skipDotNames bool, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error) { var query string var err error @@ -3731,11 +3727,18 @@ func (r *ClickHouseReader) GetMetricAggregateAttributes( continue } + if skipSignozMetrics && strings.HasPrefix(metricName, "signoz_") { + continue + } + metadata, apiError := r.GetUpdatedMetricsMetadata(ctx, metricName) - if apiError == nil && metadata != nil { - typ = string(metadata.MetricType) - isMonotonic = metadata.IsMonotonic - temporality = string(metadata.Temporality) + if apiError != nil { + zap.L().Error("Error in getting metrics cached metadata", zap.Error(apiError)) + } + if updatedMetadata, exist := metadata[metricName]; exist { + typ = string(updatedMetadata.MetricType) + isMonotonic = updatedMetadata.IsMonotonic + temporality = string(updatedMetadata.Temporality) } // Non-monotonic cumulative sums are treated as gauges @@ -3768,7 +3771,7 @@ func (r *ClickHouseReader) GetMetricAttributeKeys(ctx context.Context, req *v3.F var response v3.FilterAttributeKeyResponse // skips the internal attributes i.e attributes starting with __ - query = fmt.Sprintf("SELECT arrayJoin(tagKeys) AS distinctTagKey FROM (SELECT JSONExtractKeys(labels) AS tagKeys FROM %s.%s WHERE metric_name=$1 AND unix_milli >= $2 GROUP BY tagKeys) WHERE distinctTagKey ILIKE $3 AND distinctTagKey NOT LIKE '\\_\\_%%' GROUP BY distinctTagKey", signozMetricDBName, signozTSTableNameV41Day) + query = fmt.Sprintf("SELECT arrayJoin(tagKeys) AS distinctTagKey FROM (SELECT JSONExtractKeys(labels) AS tagKeys FROM %s.%s WHERE metric_name=$1 AND unix_milli >= $2 AND __normalized = true GROUP BY tagKeys) WHERE distinctTagKey ILIKE $3 AND distinctTagKey NOT LIKE '\\_\\_%%' GROUP BY distinctTagKey", signozMetricDBName, signozTSTableNameV41Day) if req.Limit != 0 { query = query + fmt.Sprintf(" LIMIT %d;", req.Limit) } @@ -3859,15 +3862,21 @@ func (r *ClickHouseReader) GetMetricMetadata(ctx context.Context, metricName, se } } metadata, apiError := r.GetUpdatedMetricsMetadata(ctx, metricName) - if apiError == nil && metadata != nil { - metricType = string(metadata.MetricType) - temporality = string(metadata.Temporality) + if apiError != nil { + zap.L().Error("Error in getting metric cached metadata", zap.Error(apiError)) + } + if updatedMetadata, exist := metadata[metricName]; exist { + metricType = string(updatedMetadata.MetricType) + temporality = string(updatedMetadata.Temporality) if temporality == string(v3.Delta) { deltaExists = true } - isMonotonic = metadata.IsMonotonic - if metadata.Description != "" { - description = metadata.Description + isMonotonic = updatedMetadata.IsMonotonic + if updatedMetadata.Description != "" { + description = updatedMetadata.Description + } + if updatedMetadata.Unit != "" { + unit = updatedMetadata.Unit } } @@ -5862,14 +5871,24 @@ func (r *ClickHouseReader) GetMetricsLastReceived(ctx context.Context, metricNam MAX(unix_milli) AS last_received_time FROM %s.%s WHERE metric_name = ? -`, signozMetricDBName, signozSampleTableName) +`, signozMetricDBName, signozSamplesAgg30mLocalTableName) var lastReceived int64 valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads) err := r.db.QueryRow(valueCtx, query, metricName).Scan(&lastReceived) if err != nil { return 0, &model.ApiError{Typ: "ClickHouseError", Err: err} } - return lastReceived, nil // Convert to uint64 before returning + query = fmt.Sprintf(`SELECT + MAX(unix_milli) AS last_received_time +FROM %s.%s +WHERE metric_name = ? and unix_milli > ? +`, signozMetricDBName, signozSampleTableName) + var finalLastReceived int64 + err = r.db.QueryRow(valueCtx, query, metricName, lastReceived).Scan(&finalLastReceived) + if err != nil { + return 0, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + return finalLastReceived, nil // Convert to uint64 before returning } func (r *ClickHouseReader) GetTotalTimeSeriesForMetricName(ctx context.Context, metricName string) (uint64, *model.ApiError) { @@ -5890,11 +5909,11 @@ func (r *ClickHouseReader) GetAttributesForMetricName(ctx context.Context, metri const baseQueryTemplate = ` SELECT kv.1 AS key, - arrayMap(x -> trim(BOTH '"' FROM x), groupUniqArray(10000)(kv.2)) AS values, + arrayMap(x -> trim(BOTH '"' FROM x), groupUniqArray(1000)(kv.2)) AS values, length(groupUniqArray(10000)(kv.2)) AS valueCount FROM %s.%s ARRAY JOIN arrayFilter(x -> NOT startsWith(x.1, '__'), JSONExtractKeysAndValuesRaw(labels)) AS kv -WHERE metric_name = ?` +WHERE metric_name = ? AND __normalized=true` var args []interface{} args = append(args, metricName) @@ -6022,7 +6041,6 @@ func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, req *metrics_ zap.L().Error("Error iterating over metric rows", zap.Error(err)) return &response, &model.ApiError{Typ: "ClickHouseError", Err: err} } - // If no metrics were found, return early. if len(metricNames) == 0 { return &response, nil @@ -6130,8 +6148,23 @@ func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, req *metrics_ return &response, &model.ApiError{Typ: "ClickHouseError", Err: err} } + //get updated metrics data + batch, apiError := r.GetUpdatedMetricsMetadata(ctx, metricNames...) + if apiError != nil { + zap.L().Error("Error in getting metrics cached metadata", zap.Error(apiError)) + } + var filteredMetrics []metrics_explorer.MetricDetail for i := range response.Metrics { + if updatedMetrics, exists := batch[response.Metrics[i].MetricName]; exists { + response.Metrics[i].MetricType = string(updatedMetrics.MetricType) + if updatedMetrics.Unit != "" { + response.Metrics[i].MetricUnit = updatedMetrics.Unit + } + if updatedMetrics.Description != "" { + response.Metrics[i].Description = updatedMetrics.Description + } + } if samples, exists := samplesMap[response.Metrics[i].MetricName]; exists { response.Metrics[i].Samples = samples if lastReceived, exists := lastReceivedMap[response.Metrics[i].MetricName]; exists { @@ -6242,6 +6275,7 @@ func (r *ClickHouseReader) GetMetricsSamplesPercentage(ctx context.Context, req FROM %s.%s AS ts WHERE NOT startsWith(ts.metric_name, 'signoz_') AND __normalized = true + AND unix_milli BETWEEN ? AND ? %s GROUP BY ts.metric_name ORDER BY timeSeries DESC @@ -6250,7 +6284,7 @@ func (r *ClickHouseReader) GetMetricsSamplesPercentage(ctx context.Context, req ) valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads) - rows, err := r.db.Query(valueCtx, metricsQuery) + rows, err := r.db.Query(valueCtx, metricsQuery, start, end) if err != nil { zap.L().Error("Error executing metrics query", zap.Error(err)) return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} @@ -6853,53 +6887,6 @@ func (r *ClickHouseReader) CheckForLabelsInMetric(ctx context.Context, metricNam return hasLE, nil } -func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, metricName string) (*model.UpdateMetricsMetadata, *model.ApiError) { - metricsMetadata := new(model.UpdateMetricsMetadata) - cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metricName - - // Try to get from cache first - retrieveStatus, err := r.cache.Retrieve(ctx, cacheKey, metricsMetadata, true) - if err == nil && retrieveStatus == cache.RetrieveStatusHit { - return metricsMetadata, nil - } - - if err != nil { - zap.L().Info("Error retrieving metrics metadata from cache", zap.String("metric_name", metricName), zap.Error(err)) - } else { - zap.L().Info("Cache miss for metrics metadata", zap.String("metric_name", metricName)) - } - - // Query from database if cache missed - query := fmt.Sprintf(`SELECT metric_name, type, description, temporality, is_monotonic, unit - FROM %s.%s - WHERE metric_name = ?;`, signozMetricDBName, signozUpdatedMetricsMetadataTable) - - valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads) - fmt.Printf("Executing Query: %q\n", query) - row := r.db.QueryRow(valueCtx, query, metricName) - err = row.Scan( - &metricsMetadata.MetricName, - &metricsMetadata.MetricType, - &metricsMetadata.Description, - &metricsMetadata.Temporality, - &metricsMetadata.IsMonotonic, - &metricsMetadata.Unit, - ) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return nil, nil // No data found - } - return nil, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error querying metrics metadata: %v", err)} - } - - // Try caching the result, but don't return error if caching fails - if cacheErr := r.cache.Store(ctx, cacheKey, metricsMetadata, -1); cacheErr != nil { - zap.L().Error("Failed to store metrics metadata in cache", zap.String("metric_name", metricName), zap.Error(cacheErr)) - } - - return metricsMetadata, nil -} - func (r *ClickHouseReader) PreloadMetricsMetadata(ctx context.Context) []error { var allMetricsMetadata []model.UpdateMetricsMetadata var errorList []error @@ -6921,3 +6908,62 @@ func (r *ClickHouseReader) PreloadMetricsMetadata(ctx context.Context) []error { return errorList } + +func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, metricNames ...string) (map[string]*model.UpdateMetricsMetadata, *model.ApiError) { + cachedMetadata := make(map[string]*model.UpdateMetricsMetadata) + var missingMetrics []string + + // First, try retrieving each metric from cache. + for _, metricName := range metricNames { + metadata := new(model.UpdateMetricsMetadata) + cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metricName + retrieveStatus, err := r.cache.Retrieve(ctx, cacheKey, metadata, true) + if err == nil && retrieveStatus == cache.RetrieveStatusHit { + cachedMetadata[metricName] = metadata + } else { + if err != nil { + zap.L().Error("Error retrieving metrics metadata from cache", zap.String("metric_name", metricName), zap.Error(err)) + } + missingMetrics = append(missingMetrics, metricName) + } + } + + // If there are any metrics missing in the cache, query them from the database. + if len(missingMetrics) > 0 { + // Join the missing metric names; ensure proper quoting if needed. + metricList := "'" + strings.Join(metricNames, "', '") + "'" + query := fmt.Sprintf(`SELECT metric_name, type, description, temporality, is_monotonic, unit + FROM %s.%s + WHERE metric_name IN (%s);`, signozMetricDBName, signozUpdatedMetricsMetadataTable, metricList) + + valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads) + rows, err := r.db.Query(valueCtx, query) + if err != nil { + return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error querying metrics metadata: %v", err)} + } + defer rows.Close() + + for rows.Next() { + metadata := new(model.UpdateMetricsMetadata) + if err := rows.Scan( + &metadata.MetricName, + &metadata.MetricType, + &metadata.Description, + &metadata.Temporality, + &metadata.IsMonotonic, + &metadata.Unit, + ); err != nil { + return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error scanning metrics metadata: %v", err)} + } + + // Cache the result for future requests. + cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metadata.MetricName + if cacheErr := r.cache.Store(ctx, cacheKey, metadata, -1); cacheErr != nil { + zap.L().Error("Failed to store metrics metadata in cache", zap.String("metric_name", metadata.MetricName), zap.Error(cacheErr)) + } + cachedMetadata[metadata.MetricName] = metadata + } + } + + return cachedMetadata, nil +} diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 843a9bc76c..d4a2f6935d 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -4684,7 +4684,7 @@ func (aH *APIHandler) autocompleteAggregateAttributes(w http.ResponseWriter, r * switch req.DataSource { case v3.DataSourceMetrics: - response, err = aH.reader.GetMetricAggregateAttributes(r.Context(), req, true) + response, err = aH.reader.GetMetricAggregateAttributes(r.Context(), req, true, false) case v3.DataSourceLogs: response, err = aH.reader.GetLogAggregateAttributes(r.Context(), req) case v3.DataSourceTraces: diff --git a/pkg/query-service/app/metricsexplorer/summary.go b/pkg/query-service/app/metricsexplorer/summary.go index 9684213800..49cd77700a 100644 --- a/pkg/query-service/app/metricsexplorer/summary.go +++ b/pkg/query-service/app/metricsexplorer/summary.go @@ -54,7 +54,7 @@ func (receiver *SummaryService) FilterValues(ctx context.Context, params *metric case "metric_name": var filterValues []string request := v3.AggregateAttributeRequest{DataSource: v3.DataSourceMetrics, SearchText: params.SearchText, Limit: params.Limit} - attributes, err := receiver.reader.GetMetricAggregateAttributes(ctx, &request, true) + attributes, err := receiver.reader.GetMetricAggregateAttributes(ctx, &request, true, true) if err != nil { return nil, model.InternalError(err) } @@ -104,6 +104,8 @@ func (receiver *SummaryService) GetMetricsSummary(ctx context.Context, metricNam metricDetailsDTO.Metadata.MetricType = metadata.Type metricDetailsDTO.Metadata.Description = metadata.Description metricDetailsDTO.Metadata.Unit = metadata.Unit + metricDetailsDTO.Metadata.Temporality = metadata.Temporality + metricDetailsDTO.Metadata.Monotonic = metadata.IsMonotonic return nil }) diff --git a/pkg/query-service/app/querier/helper.go b/pkg/query-service/app/querier/helper.go index 7ec091bd4c..18cee86b1f 100644 --- a/pkg/query-service/app/querier/helper.go +++ b/pkg/query-service/app/querier/helper.go @@ -230,9 +230,12 @@ func (q *querier) runBuilderQuery( if builderQuery.DataSource == v3.DataSourceMetrics && !q.testingMode { metadata, apiError := q.reader.GetUpdatedMetricsMetadata(ctx, builderQuery.AggregateAttribute.Key) - if apiError == nil && metadata != nil { - builderQuery.AggregateAttribute.Type = v3.AttributeKeyType(metadata.MetricType) - builderQuery.Temporality = metadata.Temporality + if apiError != nil { + zap.L().Error("Error in getting metrics cached metadata", zap.Error(apiError)) + } + if updatedMetadata, exist := metadata[builderQuery.AggregateAttribute.Key]; exist { + builderQuery.AggregateAttribute.Type = v3.AttributeKeyType(updatedMetadata.MetricType) + builderQuery.Temporality = updatedMetadata.Temporality } } diff --git a/pkg/query-service/app/querier/v2/helper.go b/pkg/query-service/app/querier/v2/helper.go index 7eeaa7d2d5..d0bc32f990 100644 --- a/pkg/query-service/app/querier/v2/helper.go +++ b/pkg/query-service/app/querier/v2/helper.go @@ -231,9 +231,12 @@ func (q *querier) runBuilderQuery( if builderQuery.DataSource == v3.DataSourceMetrics && !q.testingMode { metadata, apiError := q.reader.GetUpdatedMetricsMetadata(ctx, builderQuery.AggregateAttribute.Key) - if apiError == nil && metadata != nil { - builderQuery.AggregateAttribute.Type = v3.AttributeKeyType(metadata.MetricType) - builderQuery.Temporality = metadata.Temporality + if apiError != nil { + zap.L().Error("Error in getting metrics cached metadata", zap.Error(apiError)) + } + if updatedMetadata, exist := metadata[builderQuery.AggregateAttribute.Key]; exist { + builderQuery.AggregateAttribute.Type = v3.AttributeKeyType(updatedMetadata.MetricType) + builderQuery.Temporality = updatedMetadata.Temporality } } diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index adf220924b..b3a4d5a9d5 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -48,7 +48,7 @@ type Reader interface { SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) FetchTemporality(ctx context.Context, metricNames []string) (map[string]map[v3.Temporality]bool, error) - GetMetricAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest, skipDotNames bool) (*v3.AggregateAttributeResponse, error) + GetMetricAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest, skipDotNames bool, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error) GetMetricAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) GetMetricAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) @@ -142,7 +142,7 @@ type Reader interface { DeleteMetricsMetadata(ctx context.Context, metricName string) *model.ApiError UpdateMetricsMetadata(ctx context.Context, req *model.UpdateMetricsMetadata) *model.ApiError - GetUpdatedMetricsMetadata(ctx context.Context, metricName string) (*model.UpdateMetricsMetadata, *model.ApiError) + GetUpdatedMetricsMetadata(ctx context.Context, metricNames ...string) (map[string]*model.UpdateMetricsMetadata, *model.ApiError) CheckForLabelsInMetric(ctx context.Context, metricName string, labels []string) (bool, *model.ApiError) } diff --git a/pkg/query-service/model/metrics_explorer/summary.go b/pkg/query-service/model/metrics_explorer/summary.go index 920a5cee69..cf3d1b9d4c 100644 --- a/pkg/query-service/model/metrics_explorer/summary.go +++ b/pkg/query-service/model/metrics_explorer/summary.go @@ -65,6 +65,8 @@ type Metadata struct { MetricType string `json:"metric_type"` Description string `json:"description"` Unit string `json:"unit"` + Temporality string `json:"temporality"` + Monotonic bool `json:"monotonic"` } // Alert represents individual alerts associated with the metric. diff --git a/pkg/query-service/rules/threshold_rule_test.go b/pkg/query-service/rules/threshold_rule_test.go index e3b1f4985f..7534810d0a 100644 --- a/pkg/query-service/rules/threshold_rule_test.go +++ b/pkg/query-service/rules/threshold_rule_test.go @@ -2,7 +2,7 @@ package rules import ( "context" - "database/sql" + "fmt" "go.signoz.io/signoz/pkg/cache" "go.signoz.io/signoz/pkg/cache/memorycache" "go.signoz.io/signoz/pkg/factory/factorytest" @@ -1227,10 +1227,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) { for idx, c := range cases { rows := cmock.NewRows(cols, c.values) - - cacheCols := make([]cmock.ColumnType, 0) - mock.ExpectQueryRow(".*").WillReturnRow(cmock.NewRow(cacheCols, nil)).WillReturnError(sql.ErrNoRows) - + mock.ExpectQuery(".*").WillReturnError(fmt.Errorf("error")) // We are testing the eval logic after the query is run // so we don't care about the query string here queryString := "SELECT any" @@ -1331,8 +1328,7 @@ func TestThresholdRuleNoData(t *testing.T) { for idx, c := range cases { rows := cmock.NewRows(cols, c.values) - cacheCols := make([]cmock.ColumnType, 0) - mock.ExpectQueryRow(".*").WillReturnRow(cmock.NewRow(cacheCols, nil)).WillReturnError(sql.ErrNoRows) + mock.ExpectQuery(".*").WillReturnError(fmt.Errorf("error")) // We are testing the eval logic after the query is run // so we don't care about the query string here