From 4134eb621c950d9f90bc89b3834c334ee02afd60 Mon Sep 17 00:00:00 2001 From: aniketio-ctrl Date: Thu, 27 Feb 2025 12:25:58 +0530 Subject: [PATCH] feat(summary): added alerts and basic query updates (#7174) --- .../app/clickhouseReader/reader.go | 137 ++++++++++++------ pkg/query-service/app/http_handler.go | 2 +- .../app/metricsexplorer/summary.go | 56 +++++-- pkg/query-service/interfaces/interface.go | 3 +- .../model/metrics_explorer/summary.go | 2 +- pkg/query-service/rules/manager.go | 74 ++++++++++ 6 files changed, 212 insertions(+), 62 deletions(-) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index d6ab76ec88..01e0d0b21a 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -5692,15 +5692,32 @@ func (r *ClickHouseReader) GetAllMetricFilterTypes(ctx context.Context, req *met return response, nil } -func (r *ClickHouseReader) GetMetricsDataPointsAndLastReceived(ctx context.Context, metricName string) (uint64, uint64, *model.ApiError) { - query := fmt.Sprintf("SELECT COUNT(*) AS data_points, MAX(unix_milli) AS last_received_time FROM %s.%s WHERE metric_name = ?", signozMetricDBName, signozSampleTableName) - var lastRecievedTimestamp int64 // Changed from uint64 to int64 +func (r *ClickHouseReader) GetMetricsDataPoints(ctx context.Context, metricName string) (uint64, *model.ApiError) { + query := fmt.Sprintf(`SELECT + sum(count) as data_points +FROM %s.%s +WHERE metric_name = ? +`, signozMetricDBName, constants.SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME) var dataPoints uint64 - err := r.db.QueryRow(ctx, query, metricName).Scan(&dataPoints, &lastRecievedTimestamp) + err := r.db.QueryRow(ctx, query, metricName).Scan(&dataPoints) if err != nil { - return 0, 0, &model.ApiError{Typ: "ClickHouseError", Err: err} + return 0, &model.ApiError{Typ: "ClickHouseError", Err: err} } - return dataPoints, uint64(lastRecievedTimestamp), nil // Convert to uint64 before returning + return dataPoints, nil // Convert to uint64 before returning +} + +func (r *ClickHouseReader) GetMetricsLastReceived(ctx context.Context, metricName string) (int64, *model.ApiError) { + query := fmt.Sprintf(`SELECT + MAX(unix_milli) AS last_received_time +FROM %s.%s +WHERE metric_name = ? +`, signozMetricDBName, signozSampleTableName) + var lastReceived int64 + err := r.db.QueryRow(ctx, query, metricName).Scan(&lastReceived) + if err != nil { + return 0, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + return lastReceived, nil // Convert to uint64 before returning } func (r *ClickHouseReader) GetTotalTimeSeriesForMetricName(ctx context.Context, metricName string) (uint64, *model.ApiError) { @@ -5854,33 +5871,48 @@ func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, req *metrics_ orderByClauseFirstQuery = "" } - sampleQuery := fmt.Sprintf( + var sb strings.Builder + + sb.WriteString(fmt.Sprintf( `SELECT - s.samples, - s.metric_name, - s.unix_milli AS lastReceived - FROM ( - SELECT - metric_name, - %s AS samples, - max(unix_milli) as unix_milli - FROM %s.%s - WHERE fingerprint IN ( - SELECT fingerprint - FROM %s.%s - WHERE unix_milli BETWEEN ? AND ? - %s - AND metric_name IN (%s) - GROUP BY fingerprint - ) - AND metric_name in (%s) - GROUP BY metric_name - ) AS s - %s - LIMIT %d OFFSET %d;`, - countExp, signozMetricDBName, sampleTable, signozMetricDBName, localTsTable, - whereClause, metricsList, metricsList, orderByClauseFirstQuery, - req.Limit, req.Offset) + s.samples, + s.metric_name, + s.unix_milli AS lastReceived + FROM ( + SELECT + metric_name, + %s AS samples, + max(unix_milli) as unix_milli + FROM %s.%s + `, countExp, signozMetricDBName, sampleTable)) + + // Conditionally add the fingerprint subquery if `whereClause` is present + if whereClause != "" { + sb.WriteString(fmt.Sprintf( + `WHERE fingerprint IN ( + SELECT fingerprint + FROM %s.%s + WHERE unix_milli BETWEEN ? AND ? + %s + AND metric_name IN (%s) + GROUP BY fingerprint + ) + AND metric_name IN (%s) `, + signozMetricDBName, localTsTable, whereClause, metricsList, metricsList)) + } else { + sb.WriteString(fmt.Sprintf( + `WHERE metric_name IN (%s) `, metricsList)) + } + + sb.WriteString(`GROUP BY metric_name ) AS s `) + + if orderByClauseFirstQuery != "" { + sb.WriteString(orderByClauseFirstQuery) + } + + sb.WriteString(fmt.Sprintf(" LIMIT %d OFFSET %d;", req.Limit, req.Offset)) + + sampleQuery := sb.String() args = append(args, start, end) rows, err = r.db.Query(valueCtx, sampleQuery, args...) @@ -6063,8 +6095,10 @@ func (r *ClickHouseReader) GetMetricsSamplesPercentage(ctx context.Context, req // Format metric names for query metricsList := "'" + strings.Join(metricNames, "', '") + "'" - // Construct the sample percentage query - sampleQuery := fmt.Sprintf( + // Build query using string builder for better performance + var sb strings.Builder + + sb.WriteString(fmt.Sprintf( `WITH TotalSamples AS ( SELECT %s AS total_samples FROM %s.%s @@ -6079,9 +6113,15 @@ func (r *ClickHouseReader) GetMetricsSamplesPercentage(ctx context.Context, req SELECT metric_name, %s AS samples - FROM %s.%s - WHERE fingerprint IN - ( + FROM %s.%s`, + countExp, signozMetricDBName, sampleTable, // Total samples + countExp, signozMetricDBName, sampleTable, // Inner select samples + )) + + // Conditionally add the fingerprint subquery if whereClause is present + if whereClause != "" { + sb.WriteString(fmt.Sprintf( + ` WHERE fingerprint IN ( SELECT fingerprint FROM %s.%s WHERE unix_milli BETWEEN ? AND ? @@ -6089,18 +6129,27 @@ func (r *ClickHouseReader) GetMetricsSamplesPercentage(ctx context.Context, req AND metric_name IN (%s) GROUP BY fingerprint ) - AND metric_name IN (%s) + AND metric_name IN (%s)`, + signozMetricDBName, localTsTable, whereClause, metricsList, + metricsList, + )) + } else { + sb.WriteString(fmt.Sprintf( + ` WHERE metric_name IN (%s)`, + metricsList, + )) + } + + sb.WriteString(` GROUP BY metric_name ) AS s JOIN TotalSamples t ON 1 = 1 ORDER BY percentage DESC - LIMIT %d;`, - countExp, signozMetricDBName, sampleTable, // Total samples - countExp, signozMetricDBName, sampleTable, // Inner select samples - signozMetricDBName, localTsTable, whereClause, metricsList, // Subquery conditions - metricsList, req.Limit, // Final conditions - ) + LIMIT ?;`) + sampleQuery := sb.String() + + // Add start and end time to args args = append(args, start, end) // Execute the sample percentage query diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index e9a94803dc..d0d31577ac 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -219,7 +219,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { jobsRepo := inframetrics.NewJobsRepo(opts.Reader, querierv2) pvcsRepo := inframetrics.NewPvcsRepo(opts.Reader, querierv2) //explorerCache := metricsexplorer.NewExplorerCache(metricsexplorer.WithCache(opts.Cache)) - summaryService := metricsexplorer.NewSummaryService(opts.Reader, querierv2) + summaryService := metricsexplorer.NewSummaryService(opts.Reader, opts.RuleManager) aH := &APIHandler{ reader: opts.Reader, diff --git a/pkg/query-service/app/metricsexplorer/summary.go b/pkg/query-service/app/metricsexplorer/summary.go index f8f511f53f..952da81439 100644 --- a/pkg/query-service/app/metricsexplorer/summary.go +++ b/pkg/query-service/app/metricsexplorer/summary.go @@ -3,6 +3,7 @@ package metricsexplorer import ( "context" "encoding/json" + "errors" "time" "go.uber.org/zap" @@ -12,16 +13,17 @@ import ( "go.signoz.io/signoz/pkg/query-service/model" "go.signoz.io/signoz/pkg/query-service/model/metrics_explorer" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/rules" "golang.org/x/sync/errgroup" ) type SummaryService struct { - reader interfaces.Reader - querierV2 interfaces.Querier + reader interfaces.Reader + rulesManager *rules.Manager } -func NewSummaryService(reader interfaces.Reader, querierV2 interfaces.Querier) *SummaryService { - return &SummaryService{reader: reader, querierV2: querierV2} +func NewSummaryService(reader interfaces.Reader, alertManager *rules.Manager) *SummaryService { + return &SummaryService{reader: reader, rulesManager: alertManager} } func (receiver *SummaryService) FilterKeys(ctx context.Context, params *metrics_explorer.FilterKeyRequest) (*metrics_explorer.FilterKeyResponse, *model.ApiError) { @@ -102,18 +104,24 @@ func (receiver *SummaryService) GetMetricsSummary(ctx context.Context, metricNam return nil }) - // Call 2: GetMetricsDataPointsAndLastReceived g.Go(func() error { - dataPoints, lastReceived, err := receiver.reader.GetMetricsDataPointsAndLastReceived(ctx, metricName) + dataPoints, err := receiver.reader.GetMetricsDataPoints(ctx, metricName) if err != nil { return err } metricDetailsDTO.Samples = dataPoints + return nil + }) + + g.Go(func() error { + lastReceived, err := receiver.reader.GetMetricsLastReceived(ctx, metricName) + if err != nil { + return err + } metricDetailsDTO.LastReceived = lastReceived return nil }) - // Call 3: GetTotalTimeSeriesForMetricName g.Go(func() error { totalSeries, err := receiver.reader.GetTotalTimeSeriesForMetricName(ctx, metricName) if err != nil { @@ -123,7 +131,6 @@ func (receiver *SummaryService) GetMetricsSummary(ctx context.Context, metricNam return nil }) - // Call 4: GetActiveTimeSeriesForMetricName g.Go(func() error { activeSeries, err := receiver.reader.GetActiveTimeSeriesForMetricName(ctx, metricName, 120*time.Minute) if err != nil { @@ -133,7 +140,6 @@ func (receiver *SummaryService) GetMetricsSummary(ctx context.Context, metricNam return nil }) - // Call 5: GetAttributesForMetricName g.Go(func() error { attributes, err := receiver.reader.GetAttributesForMetricName(ctx, metricName) if err != nil { @@ -145,7 +151,6 @@ func (receiver *SummaryService) GetMetricsSummary(ctx context.Context, metricNam return nil }) - // Call 6: GetDashboardsWithMetricName g.Go(func() error { data, err := dashboards.GetDashboardsWithMetricName(ctx, metricName) if err != nil { @@ -169,13 +174,30 @@ func (receiver *SummaryService) GetMetricsSummary(ctx context.Context, metricNam return nil }) + g.Go(func() error { + var metrics []string + var metricsAlerts []metrics_explorer.Alert + metrics = append(metrics, metricName) + data, err := receiver.rulesManager.GetAlertDetailsForMetricNames(ctx, metrics) + if err != nil { + return err + } + if rulesLists, ok := data[metricName]; ok { + for _, rule := range rulesLists { + metricsAlerts = append(metricsAlerts, metrics_explorer.Alert{AlertName: rule.AlertName, AlertID: rule.Id}) + } + } + metricDetailsDTO.Alerts = metricsAlerts + return nil + }) + // Wait for all goroutines and handle any errors if err := g.Wait(); err != nil { - // Type assert to check if it's already an ApiError - if apiErr, ok := err.(*model.ApiError); ok { + + var apiErr *model.ApiError + if errors.As(err, &apiErr) { return metrics_explorer.MetricDetailsDTO{}, apiErr } - // If it's not an ApiError, wrap it in one return metrics_explorer.MetricDetailsDTO{}, &model.ApiError{Typ: "InternalError", Err: err} } @@ -194,14 +216,18 @@ func (receiver *SummaryService) GetMetricsTreemap(ctx context.Context, params *m if apiError != nil { return nil, apiError } - response.TimeSeries = *cardinality + if cardinality != nil { + response.TimeSeries = *cardinality + } return &response, nil case metrics_explorer.SamplesTreeMap: dataPoints, apiError := receiver.reader.GetMetricsSamplesPercentage(ctx, params) if apiError != nil { return nil, apiError } - response.Samples = *dataPoints + if dataPoints != nil { + response.Samples = *dataPoints + } return &response, nil default: return nil, nil diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index db2204f637..ff4aedf729 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -122,7 +122,8 @@ type Reader interface { GetAllMetricFilterTypes(ctx context.Context, req *metrics_explorer.FilterValueRequest) ([]string, *model.ApiError) GetAllMetricFilterAttributeKeys(ctx context.Context, req *metrics_explorer.FilterKeyRequest, skipDotNames bool) (*[]v3.AttributeKey, *model.ApiError) - GetMetricsDataPointsAndLastReceived(ctx context.Context, metricName string) (uint64, uint64, *model.ApiError) + GetMetricsDataPoints(ctx context.Context, metricName string) (uint64, *model.ApiError) + GetMetricsLastReceived(ctx context.Context, metricName string) (int64, *model.ApiError) GetTotalTimeSeriesForMetricName(ctx context.Context, metricName string) (uint64, *model.ApiError) GetActiveTimeSeriesForMetricName(ctx context.Context, metricName string, duration time.Duration) (uint64, *model.ApiError) GetAttributesForMetricName(ctx context.Context, metricName string) (*[]metrics_explorer.Attribute, *model.ApiError) diff --git a/pkg/query-service/model/metrics_explorer/summary.go b/pkg/query-service/model/metrics_explorer/summary.go index 2705bd8b5f..ba80bc45f3 100644 --- a/pkg/query-service/model/metrics_explorer/summary.go +++ b/pkg/query-service/model/metrics_explorer/summary.go @@ -89,7 +89,7 @@ type MetricDetailsDTO struct { Samples uint64 `json:"samples"` TimeSeriesTotal uint64 `json:"timeSeriesTotal"` TimeSeriesActive uint64 `json:"timeSeriesActive"` - LastReceived uint64 `json:"lastReceived"` + LastReceived int64 `json:"lastReceived"` Attributes []Attribute `json:"attributes"` Metadata Metadata `json:"metadata"` Alerts []Alert `json:"alerts"` diff --git a/pkg/query-service/rules/manager.go b/pkg/query-service/rules/manager.go index 84e055cfa0..6169708685 100644 --- a/pkg/query-service/rules/manager.go +++ b/pkg/query-service/rules/manager.go @@ -829,3 +829,77 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m return alertCount, apiErr } + +func (m *Manager) GetAlertDetailsForMetricNames(ctx context.Context, metricNames []string) (map[string][]GettableRule, error) { + result := make(map[string][]GettableRule) + + rules, err := m.ruleDB.GetStoredRules(ctx) + if err != nil { + zap.L().Error("Error getting stored rules", zap.Error(err)) + return nil, err + } + + metricRulesMap := make(map[string][]GettableRule) + + for _, storedRule := range rules { + var rule GettableRule + if err := json.Unmarshal([]byte(storedRule.Data), &rule); err != nil { + zap.L().Error("Invalid rule data", zap.Error(err)) + continue + } + + if rule.AlertType != AlertTypeMetric || rule.RuleCondition == nil || rule.RuleCondition.CompositeQuery == nil { + continue + } + + rule.Id = fmt.Sprintf("%d", storedRule.Id) + rule.CreatedAt = storedRule.CreatedAt + rule.CreatedBy = storedRule.CreatedBy + rule.UpdatedAt = storedRule.UpdatedAt + rule.UpdatedBy = storedRule.UpdatedBy + + for _, query := range rule.RuleCondition.CompositeQuery.BuilderQueries { + if query.AggregateAttribute.Key != "" { + metricRulesMap[query.AggregateAttribute.Key] = append(metricRulesMap[query.AggregateAttribute.Key], rule) + } + } + + for _, query := range rule.RuleCondition.CompositeQuery.PromQueries { + if query.Query != "" { + for _, metricName := range metricNames { + if strings.Contains(query.Query, metricName) { + metricRulesMap[metricName] = append(metricRulesMap[metricName], rule) + } + } + } + } + + for _, query := range rule.RuleCondition.CompositeQuery.ClickHouseQueries { + if query.Query != "" { + for _, metricName := range metricNames { + if strings.Contains(query.Query, metricName) { + metricRulesMap[metricName] = append(metricRulesMap[metricName], rule) + } + } + } + } + } + + for _, metricName := range metricNames { + if rules, exists := metricRulesMap[metricName]; exists { + seen := make(map[string]bool) + uniqueRules := make([]GettableRule, 0) + + for _, rule := range rules { + if !seen[rule.Id] { + seen[rule.Id] = true + uniqueRules = append(uniqueRules, rule) + } + } + + result[metricName] = uniqueRules + } + } + + return result, nil +}