feat(explorer): added related metrics (#7193)

This commit is contained in:
aniketio-ctrl 2025-02-28 13:06:41 +05:30 committed by GitHub
parent 2a56f79e1d
commit 735b565992
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 555 additions and 14 deletions

View File

@ -1143,7 +1143,7 @@ func (r *ClickHouseReader) GetUsage(ctx context.Context, queryParams *model.GetU
func (r *ClickHouseReader) SearchTracesV2(ctx context.Context, params *model.SearchTracesParams,
smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string,
levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) {
levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) {
searchSpansResult := []model.SearchSpansResult{
{
Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References", "Events", "HasError", "StatusMessage", "StatusCodeString", "SpanKind"},
@ -1291,7 +1291,7 @@ func (r *ClickHouseReader) SearchTracesV2(ctx context.Context, params *model.Sea
func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.SearchTracesParams,
smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string,
levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) {
levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) {
if r.useTraceNewSchema {
return r.SearchTracesV2(ctx, params, smartTraceAlgorithm)
@ -6177,3 +6177,194 @@ func (r *ClickHouseReader) GetMetricsSamplesPercentage(ctx context.Context, req
return &heatmap, nil
}
func (r *ClickHouseReader) GetNameSimilarity(ctx context.Context, req *metrics_explorer.RelatedMetricsRequest) (map[string]metrics_explorer.RelatedMetricsScore, *model.ApiError) {
start, end, tsTable, _ := utils.WhichTSTableToUse(req.Start, req.End)
query := fmt.Sprintf(`
SELECT
metric_name,
any(type) as type,
any(temporality) as temporality,
any(is_monotonic) as monotonic,
1 - (levenshteinDistance(?, metric_name) / greatest(NULLIF(length(?), 0), NULLIF(length(metric_name), 0))) AS name_similarity
FROM %s.%s
WHERE metric_name != ?
AND unix_milli BETWEEN ? AND ?
GROUP BY metric_name
ORDER BY name_similarity DESC
LIMIT 30;`,
signozMetricDBName, tsTable)
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
rows, err := r.db.Query(valueCtx, query, req.CurrentMetricName, req.CurrentMetricName, req.CurrentMetricName, start, end)
if err != nil {
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
defer rows.Close()
result := make(map[string]metrics_explorer.RelatedMetricsScore)
for rows.Next() {
var metric string
var sim float64
var metricType v3.MetricType
var temporality v3.Temporality
var isMonotonic bool
if err := rows.Scan(&metric, &metricType, &temporality, &isMonotonic, &sim); err != nil {
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
result[metric] = metrics_explorer.RelatedMetricsScore{
NameSimilarity: sim,
MetricType: metricType,
Temporality: temporality,
IsMonotonic: isMonotonic,
}
}
return result, nil
}
func (r *ClickHouseReader) GetAttributeSimilarity(ctx context.Context, req *metrics_explorer.RelatedMetricsRequest) (map[string]metrics_explorer.RelatedMetricsScore, *model.ApiError) {
start, end, tsTable, _ := utils.WhichTSTableToUse(req.Start, req.End)
// Get target labels
extractedLabelsQuery := fmt.Sprintf(`
SELECT
kv.1 AS label_key,
topK(10)(JSONExtractString(kv.2)) AS label_values
FROM %s.%s
ARRAY JOIN JSONExtractKeysAndValuesRaw(labels) AS kv
WHERE metric_name = ?
AND unix_milli between ? and ?
AND NOT startsWith(kv.1, '__')
GROUP BY label_key
LIMIT 50`, signozMetricDBName, tsTable)
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
rows, err := r.db.Query(valueCtx, extractedLabelsQuery, req.CurrentMetricName, start, end)
if err != nil {
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
defer rows.Close()
var targetKeys []string
var targetValues []string
for rows.Next() {
var key string
var value []string
if err := rows.Scan(&key, &value); err != nil {
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
targetKeys = append(targetKeys, key)
targetValues = append(targetValues, value...)
}
targetKeysList := "'" + strings.Join(targetKeys, "', '") + "'"
targetValuesList := "'" + strings.Join(targetValues, "', '") + "'"
var priorityList []string
for _, f := range req.Filters.Items {
if f.Operator == v3.FilterOperatorEqual {
priorityList = append(priorityList, fmt.Sprintf("tuple('%s', '%s')", f.Key.Key, f.Value))
}
}
priorityListString := strings.Join(priorityList, ", ")
candidateLabelsQuery := fmt.Sprintf(`
WITH
arrayDistinct([%s]) AS filter_keys,
arrayDistinct([%s]) AS filter_values,
[%s] AS priority_pairs_input,
%d AS priority_multiplier
SELECT
metric_name,
any(type) as type,
any(temporality) as temporality,
any(is_monotonic) as monotonic,
SUM(
arraySum(
kv -> if(has(filter_keys, kv.1) AND has(filter_values, kv.2), 1, 0),
JSONExtractKeysAndValues(labels, 'String')
)
)::UInt64 AS raw_match_count,
SUM(
arraySum(
kv ->
if(
arrayExists(pr -> pr.1 = kv.1 AND pr.2 = kv.2, priority_pairs_input),
priority_multiplier,
0
),
JSONExtractKeysAndValues(labels, 'String')
)
)::UInt64 AS weighted_match_count,
toJSONString(
arrayDistinct(
arrayFlatten(
groupArray(
arrayFilter(
kv -> arrayExists(pr -> pr.1 = kv.1 AND pr.2 = kv.2, priority_pairs_input),
JSONExtractKeysAndValues(labels, 'String')
)
)
)
)
) AS priority_pairs
FROM %s.%s
WHERE rand() %% 100 < 10
AND unix_milli between ? and ?
GROUP BY metric_name
ORDER BY weighted_match_count DESC, raw_match_count DESC
LIMIT 30
`,
targetKeysList, targetValuesList, priorityListString, 2,
signozMetricDBName, tsTable)
rows, err = r.db.Query(valueCtx, candidateLabelsQuery, start, end)
if err != nil {
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
defer rows.Close()
result := make(map[string]metrics_explorer.RelatedMetricsScore)
attributeMap := make(map[string]uint64)
for rows.Next() {
var metric string
var metricType v3.MetricType
var temporality v3.Temporality
var isMonotonic bool
var weightedMatchCount, rawMatchCount uint64
var priorityPairsJSON string
if err := rows.Scan(&metric, &metricType, &temporality, &isMonotonic, &rawMatchCount, &weightedMatchCount, &priorityPairsJSON); err != nil {
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
attributeMap[metric] = weightedMatchCount + (rawMatchCount)/10
var priorityPairs [][]string
if err := json.Unmarshal([]byte(priorityPairsJSON), &priorityPairs); err != nil {
priorityPairs = [][]string{}
}
result[metric] = metrics_explorer.RelatedMetricsScore{
AttributeSimilarity: float64(attributeMap[metric]), // Will be normalized later
Filters: priorityPairs,
MetricType: metricType,
Temporality: temporality,
IsMonotonic: isMonotonic,
}
}
// Normalize the attribute similarity scores
normalizeMap := utils.NormalizeMap(attributeMap)
for metric := range result {
if score, exists := normalizeMap[metric]; exists {
metricScore := result[metric]
metricScore.AttributeSimilarity = score
result[metric] = metricScore
}
}
return result, nil
}

View File

@ -538,7 +538,7 @@ func countPanelsInDashboard(inputData map[string]interface{}) model.DashboardsIn
}
}
func GetDashboardsWithMetricName(ctx context.Context, metricName string) ([]map[string]string, *model.ApiError) {
func GetDashboardsWithMetricNames(ctx context.Context, metricNames []string) (map[string][]map[string]string, *model.ApiError) {
// Get all dashboards first
query := `SELECT uuid, data FROM dashboards`
@ -554,8 +554,13 @@ func GetDashboardsWithMetricName(ctx context.Context, metricName string) ([]map[
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
}
// Initialize result map for each metric
result := make(map[string][]map[string]string)
// for _, metricName := range metricNames {
// result[metricName] = []map[string]string{}
// }
// Process the JSON data in Go
var result []map[string]string
for _, dashboard := range dashboards {
var dashData map[string]interface{}
if err := json.Unmarshal(dashboard.Data, &dashData); err != nil {
@ -607,13 +612,18 @@ func GetDashboardsWithMetricName(ctx context.Context, metricName string) ([]map[
continue
}
if key, ok := aggregateAttr["key"].(string); ok && strings.TrimSpace(key) == metricName {
result = append(result, map[string]string{
"dashboard_id": dashboard.Uuid,
"widget_title": widgetTitle,
"widget_id": widgetID,
"dashboard_title": dashTitle,
})
if key, ok := aggregateAttr["key"].(string); ok {
// Check if this metric is in our list of interest
for _, metricName := range metricNames {
if strings.TrimSpace(key) == metricName {
result[metricName] = append(result[metricName], map[string]string{
"dashboard_id": dashboard.Uuid,
"widget_title": widgetTitle,
"widget_id": widgetID,
"dashboard_title": dashTitle,
})
}
}
}
}
}

View File

@ -628,6 +628,9 @@ func (ah *APIHandler) MetricExplorerRoutes(router *mux.Router, am *AuthMiddlewar
router.HandleFunc("/api/v1/metrics/treemap",
am.ViewAccess(ah.GetTreeMap)).
Methods(http.MethodPost)
router.HandleFunc("/api/v1/metrics/related",
am.ViewAccess(ah.GetRelatedMetrics)).
Methods(http.MethodPost)
}
func Intersection(a, b []int) (c []int) {

View File

@ -68,3 +68,11 @@ func ParseTreeMapMetricsParams(r *http.Request) (*metrics_explorer.TreeMapMetric
return treeMapMetricParams, nil
}
func ParseRelatedMetricsParams(r *http.Request) (*metrics_explorer.RelatedMetricsRequest, *model.ApiError) {
var relatedMetricParams metrics_explorer.RelatedMetricsRequest
if err := json.NewDecoder(r.Body).Decode(&relatedMetricParams); err != nil {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)}
}
return &relatedMetricParams, nil
}

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"sort"
"time"
"go.uber.org/zap"
@ -152,7 +153,9 @@ func (receiver *SummaryService) GetMetricsSummary(ctx context.Context, metricNam
})
g.Go(func() error {
data, err := dashboards.GetDashboardsWithMetricName(ctx, metricName)
var metricNames []string
metricNames = append(metricNames, metricName)
data, err := dashboards.GetDashboardsWithMetricNames(ctx, metricNames)
if err != nil {
return err
}
@ -233,3 +236,214 @@ func (receiver *SummaryService) GetMetricsTreemap(ctx context.Context, params *m
return nil, nil
}
}
func (receiver *SummaryService) GetRelatedMetrics(ctx context.Context, params *metrics_explorer.RelatedMetricsRequest) (*metrics_explorer.RelatedMetricsResponse, *model.ApiError) {
// Get name similarity scores
nameSimilarityScores, err := receiver.reader.GetNameSimilarity(ctx, params)
if err != nil {
return nil, err
}
attrCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
attrSimilarityScores, err := receiver.reader.GetAttributeSimilarity(attrCtx, params)
if err != nil {
// If we hit a deadline exceeded error, proceed with only name similarity
if errors.Is(err.Err, context.DeadlineExceeded) {
zap.L().Warn("Attribute similarity calculation timed out, proceeding with name similarity only")
attrSimilarityScores = make(map[string]metrics_explorer.RelatedMetricsScore)
} else {
return nil, err
}
}
// Combine scores and compute final scores
finalScores := make(map[string]float64)
relatedMetricsMap := make(map[string]metrics_explorer.RelatedMetricsScore)
// Merge name and attribute similarity scores
for metric, nameScore := range nameSimilarityScores {
attrScore, exists := attrSimilarityScores[metric]
if exists {
relatedMetricsMap[metric] = metrics_explorer.RelatedMetricsScore{
NameSimilarity: nameScore.NameSimilarity,
AttributeSimilarity: attrScore.AttributeSimilarity,
Filters: attrScore.Filters,
MetricType: attrScore.MetricType,
Temporality: attrScore.Temporality,
IsMonotonic: attrScore.IsMonotonic,
}
} else {
relatedMetricsMap[metric] = nameScore
}
finalScores[metric] = nameScore.NameSimilarity*0.7 + relatedMetricsMap[metric].AttributeSimilarity*0.3
}
// Handle metrics that are only present in attribute similarity scores
for metric, attrScore := range attrSimilarityScores {
if _, exists := nameSimilarityScores[metric]; !exists {
relatedMetricsMap[metric] = metrics_explorer.RelatedMetricsScore{
AttributeSimilarity: attrScore.AttributeSimilarity,
Filters: attrScore.Filters,
MetricType: attrScore.MetricType,
Temporality: attrScore.Temporality,
IsMonotonic: attrScore.IsMonotonic,
}
finalScores[metric] = attrScore.AttributeSimilarity * 0.3
}
}
type metricScore struct {
Name string
Score float64
}
var sortedScores []metricScore
for metric, score := range finalScores {
sortedScores = append(sortedScores, metricScore{
Name: metric,
Score: score,
})
}
sort.Slice(sortedScores, func(i, j int) bool {
return sortedScores[i].Score > sortedScores[j].Score
})
metricNames := make([]string, len(sortedScores))
for i, ms := range sortedScores {
metricNames[i] = ms.Name
}
// Fetch dashboards and alerts concurrently
g, ctx := errgroup.WithContext(ctx)
dashboardsRelatedData := make(map[string][]metrics_explorer.Dashboard)
alertsRelatedData := make(map[string][]metrics_explorer.Alert)
g.Go(func() error {
names, apiError := dashboards.GetDashboardsWithMetricNames(ctx, metricNames)
if apiError != nil {
return apiError
}
if names != nil {
jsonData, err := json.Marshal(names)
if err != nil {
zap.L().Error("Error marshalling dashboard data", zap.Error(err))
return &model.ApiError{Typ: "MarshallingErr", Err: err}
}
err = json.Unmarshal(jsonData, &dashboardsRelatedData)
if err != nil {
zap.L().Error("Error unmarshalling dashboard data", zap.Error(err))
return &model.ApiError{Typ: "UnMarshallingErr", Err: err}
}
}
return nil
})
g.Go(func() error {
rulesData, apiError := receiver.rulesManager.GetAlertDetailsForMetricNames(ctx, metricNames)
if apiError != nil {
return apiError
}
for s, gettableRules := range rulesData {
var metricsRules []metrics_explorer.Alert
for _, rule := range gettableRules {
metricsRules = append(metricsRules, metrics_explorer.Alert{AlertID: rule.Id, AlertName: rule.AlertName})
}
alertsRelatedData[s] = metricsRules
}
return nil
})
// Check for context cancellation before waiting
if ctx.Err() != nil {
return nil, &model.ApiError{Typ: "ContextCanceled", Err: ctx.Err()}
}
if err := g.Wait(); err != nil {
var apiErr *model.ApiError
if errors.As(err, &apiErr) {
return nil, apiErr
}
return nil, &model.ApiError{Typ: "InternalError", Err: err}
}
// Build response
var response metrics_explorer.RelatedMetricsResponse
for _, ms := range sortedScores {
relatedMetric := metrics_explorer.RelatedMetrics{
Name: ms.Name,
Query: getQueryRangeForRelateMetricsList(ms.Name, relatedMetricsMap[ms.Name]),
}
if dashboardsDetails, ok := dashboardsRelatedData[ms.Name]; ok {
relatedMetric.Dashboards = dashboardsDetails
}
if alerts, ok := alertsRelatedData[ms.Name]; ok {
relatedMetric.Alerts = alerts
}
response.RelatedMetrics = append(response.RelatedMetrics, relatedMetric)
}
return &response, nil
}
func getQueryRangeForRelateMetricsList(metricName string, scores metrics_explorer.RelatedMetricsScore) *v3.BuilderQuery {
var filterItems []v3.FilterItem
for _, pair := range scores.Filters {
if len(pair) < 2 {
continue // Skip invalid filter pairs.
}
filterItem := v3.FilterItem{
Key: v3.AttributeKey{
Key: pair[0], // Default type, or you can use v3.AttributeKeyTypeUnspecified.
IsColumn: false,
IsJSON: false,
},
Value: pair[1],
Operator: v3.FilterOperatorEqual, // Using "=" as the operator.
}
filterItems = append(filterItems, filterItem)
}
// If there are any filters, combine them with an "AND" operator.
var filters *v3.FilterSet
if len(filterItems) > 0 {
filters = &v3.FilterSet{
Operator: "AND",
Items: filterItems,
}
}
// Create the BuilderQuery. Here we set the QueryName to the metric name.
query := v3.BuilderQuery{
QueryName: metricName,
DataSource: v3.DataSourceMetrics,
Expression: metricName, // Using metric name as expression
Filters: filters,
}
if scores.MetricType == v3.MetricTypeSum && !scores.IsMonotonic && scores.Temporality == v3.Cumulative {
scores.MetricType = v3.MetricTypeGauge
}
switch scores.MetricType {
case v3.MetricTypeGauge:
query.TimeAggregation = v3.TimeAggregationAvg
query.SpaceAggregation = v3.SpaceAggregationAvg
case v3.MetricTypeSum:
query.TimeAggregation = v3.TimeAggregationRate
query.SpaceAggregation = v3.SpaceAggregationSum
case v3.MetricTypeHistogram:
query.SpaceAggregation = v3.SpaceAggregationPercentile95
}
query.AggregateAttribute = v3.AttributeKey{
Key: metricName,
Type: v3.AttributeKeyType(scores.MetricType),
}
query.StepInterval = 60
return &query
}

View File

@ -102,3 +102,23 @@ func (aH *APIHandler) GetTreeMap(w http.ResponseWriter, r *http.Request) {
aH.Respond(w, result)
}
func (aH *APIHandler) GetRelatedMetrics(w http.ResponseWriter, r *http.Request) {
bodyBytes, _ := io.ReadAll(r.Body)
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
ctx := r.Context()
params, apiError := explorer.ParseRelatedMetricsParams(r)
if apiError != nil {
zap.L().Error("error parsing metric query range params", zap.Error(apiError.Err))
RespondError(w, apiError, nil)
return
}
result, apiError := aH.SummaryService.GetRelatedMetrics(ctx, params)
if apiError != nil {
zap.L().Error("error getting related metrics", zap.Error(apiError.Err))
RespondError(w, apiError, nil)
return
}
aH.Respond(w, result)
}

View File

@ -132,6 +132,9 @@ type Reader interface {
GetMetricsTimeSeriesPercentage(ctx context.Context, request *metrics_explorer.TreeMapMetricsRequest) (*[]metrics_explorer.TreeMapResponseItem, *model.ApiError)
GetMetricsSamplesPercentage(ctx context.Context, req *metrics_explorer.TreeMapMetricsRequest) (*[]metrics_explorer.TreeMapResponseItem, *model.ApiError)
GetNameSimilarity(ctx context.Context, req *metrics_explorer.RelatedMetricsRequest) (map[string]metrics_explorer.RelatedMetricsScore, *model.ApiError)
GetAttributeSimilarity(ctx context.Context, req *metrics_explorer.RelatedMetricsRequest) (map[string]metrics_explorer.RelatedMetricsScore, *model.ApiError)
}
type Querier interface {

View File

@ -122,3 +122,30 @@ var AvailableColumnFilterMap = map[string]bool{
"metric_unit": true,
"metric_type": true,
}
type RelatedMetricsScore struct {
AttributeSimilarity float64
NameSimilarity float64
Filters [][]string
MetricType v3.MetricType
Temporality v3.Temporality
IsMonotonic bool
}
type RelatedMetricsRequest struct {
CurrentMetricName string `json:"currentMetricName"`
Start int64 `json:"start"`
End int64 `json:"end"`
Filters v3.FilterSet `json:"filters"`
}
type RelatedMetricsResponse struct {
RelatedMetrics []RelatedMetrics `json:"related_metrics"`
}
type RelatedMetrics struct {
Name string `json:"name"`
Query *v3.BuilderQuery `json:"query"`
Dashboards []Dashboard `json:"dashboards"`
Alerts []Alert `json:"alerts"`
}

View File

@ -596,6 +596,19 @@ const (
Cumulative Temporality = "Cumulative"
)
func (t *Temporality) Scan(src interface{}) error {
if src == nil {
*t = ""
return nil
}
s, ok := src.(string)
if !ok {
return fmt.Errorf("failed to scan Temporality: %v", src)
}
*t = Temporality(s)
return nil
}
type TimeAggregation string
const (
@ -648,6 +661,19 @@ const (
MetricTypeExponentialHistogram MetricType = "ExponentialHistogram"
)
func (m *MetricType) Scan(src interface{}) error {
if src == nil {
*m = ""
return nil
}
s, ok := src.(string)
if !ok {
return fmt.Errorf("failed to scan MetricType: %v", src)
}
*m = MetricType(s)
return nil
}
type SpaceAggregation string
const (

View File

@ -830,13 +830,13 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m
return alertCount, apiErr
}
func (m *Manager) GetAlertDetailsForMetricNames(ctx context.Context, metricNames []string) (map[string][]GettableRule, error) {
func (m *Manager) GetAlertDetailsForMetricNames(ctx context.Context, metricNames []string) (map[string][]GettableRule, *model.ApiError) {
result := make(map[string][]GettableRule)
rules, err := m.ruleDB.GetStoredRules(ctx)
if err != nil {
zap.L().Error("Error getting stored rules", zap.Error(err))
return nil, err
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
}
metricRulesMap := make(map[string][]GettableRule)

View File

@ -350,3 +350,42 @@ func GetEpochNanoSecs(epoch int64) int64 {
}
return temp * int64(math.Pow(10, float64(19-count)))
}
func NormalizeMap(data map[string]uint64) map[string]float64 {
if len(data) == 0 {
return nil
}
var minVal, maxVal uint64
first := true
for _, v := range data {
if first {
minVal, maxVal = v, v
first = false
} else {
if v < minVal {
minVal = v
}
if v > maxVal {
maxVal = v
}
}
}
// If all values are the same, avoid division by zero
if minVal == maxVal {
normalized := make(map[string]float64)
for k := range data {
normalized[k] = 1.0 // or 0.0, depending on the convention
}
return normalized
}
// Normalize the values using min-max normalization
normalized := make(map[string]float64)
for k, v := range data {
normalized[k] = float64(v-minVal) / float64(maxVal-minVal)
}
return normalized
}