feat(summary): added alerts and basic query updates (#7174)

This commit is contained in:
aniketio-ctrl 2025-02-27 12:25:58 +05:30 committed by GitHub
parent a3bc290500
commit 4134eb621c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 212 additions and 62 deletions

View File

@ -5692,15 +5692,32 @@ func (r *ClickHouseReader) GetAllMetricFilterTypes(ctx context.Context, req *met
return response, nil
}
func (r *ClickHouseReader) GetMetricsDataPointsAndLastReceived(ctx context.Context, metricName string) (uint64, uint64, *model.ApiError) {
query := fmt.Sprintf("SELECT COUNT(*) AS data_points, MAX(unix_milli) AS last_received_time FROM %s.%s WHERE metric_name = ?", signozMetricDBName, signozSampleTableName)
var lastRecievedTimestamp int64 // Changed from uint64 to int64
func (r *ClickHouseReader) GetMetricsDataPoints(ctx context.Context, metricName string) (uint64, *model.ApiError) {
query := fmt.Sprintf(`SELECT
sum(count) as data_points
FROM %s.%s
WHERE metric_name = ?
`, signozMetricDBName, constants.SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME)
var dataPoints uint64
err := r.db.QueryRow(ctx, query, metricName).Scan(&dataPoints, &lastRecievedTimestamp)
err := r.db.QueryRow(ctx, query, metricName).Scan(&dataPoints)
if err != nil {
return 0, 0, &model.ApiError{Typ: "ClickHouseError", Err: err}
return 0, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
return dataPoints, uint64(lastRecievedTimestamp), nil // Convert to uint64 before returning
return dataPoints, nil // Convert to uint64 before returning
}
func (r *ClickHouseReader) GetMetricsLastReceived(ctx context.Context, metricName string) (int64, *model.ApiError) {
query := fmt.Sprintf(`SELECT
MAX(unix_milli) AS last_received_time
FROM %s.%s
WHERE metric_name = ?
`, signozMetricDBName, signozSampleTableName)
var lastReceived int64
err := r.db.QueryRow(ctx, query, metricName).Scan(&lastReceived)
if err != nil {
return 0, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
return lastReceived, nil // Convert to uint64 before returning
}
func (r *ClickHouseReader) GetTotalTimeSeriesForMetricName(ctx context.Context, metricName string) (uint64, *model.ApiError) {
@ -5854,7 +5871,9 @@ func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, req *metrics_
orderByClauseFirstQuery = ""
}
sampleQuery := fmt.Sprintf(
var sb strings.Builder
sb.WriteString(fmt.Sprintf(
`SELECT
s.samples,
s.metric_name,
@ -5865,7 +5884,12 @@ func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, req *metrics_
%s AS samples,
max(unix_milli) as unix_milli
FROM %s.%s
WHERE fingerprint IN (
`, countExp, signozMetricDBName, sampleTable))
// Conditionally add the fingerprint subquery if `whereClause` is present
if whereClause != "" {
sb.WriteString(fmt.Sprintf(
`WHERE fingerprint IN (
SELECT fingerprint
FROM %s.%s
WHERE unix_milli BETWEEN ? AND ?
@ -5873,14 +5897,22 @@ func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, req *metrics_
AND metric_name IN (%s)
GROUP BY fingerprint
)
AND metric_name in (%s)
GROUP BY metric_name
) AS s
%s
LIMIT %d OFFSET %d;`,
countExp, signozMetricDBName, sampleTable, signozMetricDBName, localTsTable,
whereClause, metricsList, metricsList, orderByClauseFirstQuery,
req.Limit, req.Offset)
AND metric_name IN (%s) `,
signozMetricDBName, localTsTable, whereClause, metricsList, metricsList))
} else {
sb.WriteString(fmt.Sprintf(
`WHERE metric_name IN (%s) `, metricsList))
}
sb.WriteString(`GROUP BY metric_name ) AS s `)
if orderByClauseFirstQuery != "" {
sb.WriteString(orderByClauseFirstQuery)
}
sb.WriteString(fmt.Sprintf(" LIMIT %d OFFSET %d;", req.Limit, req.Offset))
sampleQuery := sb.String()
args = append(args, start, end)
rows, err = r.db.Query(valueCtx, sampleQuery, args...)
@ -6063,8 +6095,10 @@ func (r *ClickHouseReader) GetMetricsSamplesPercentage(ctx context.Context, req
// Format metric names for query
metricsList := "'" + strings.Join(metricNames, "', '") + "'"
// Construct the sample percentage query
sampleQuery := fmt.Sprintf(
// Build query using string builder for better performance
var sb strings.Builder
sb.WriteString(fmt.Sprintf(
`WITH TotalSamples AS (
SELECT %s AS total_samples
FROM %s.%s
@ -6079,9 +6113,15 @@ func (r *ClickHouseReader) GetMetricsSamplesPercentage(ctx context.Context, req
SELECT
metric_name,
%s AS samples
FROM %s.%s
WHERE fingerprint IN
(
FROM %s.%s`,
countExp, signozMetricDBName, sampleTable, // Total samples
countExp, signozMetricDBName, sampleTable, // Inner select samples
))
// Conditionally add the fingerprint subquery if whereClause is present
if whereClause != "" {
sb.WriteString(fmt.Sprintf(
` WHERE fingerprint IN (
SELECT fingerprint
FROM %s.%s
WHERE unix_milli BETWEEN ? AND ?
@ -6089,18 +6129,27 @@ func (r *ClickHouseReader) GetMetricsSamplesPercentage(ctx context.Context, req
AND metric_name IN (%s)
GROUP BY fingerprint
)
AND metric_name IN (%s)
AND metric_name IN (%s)`,
signozMetricDBName, localTsTable, whereClause, metricsList,
metricsList,
))
} else {
sb.WriteString(fmt.Sprintf(
` WHERE metric_name IN (%s)`,
metricsList,
))
}
sb.WriteString(`
GROUP BY metric_name
) AS s
JOIN TotalSamples t ON 1 = 1
ORDER BY percentage DESC
LIMIT %d;`,
countExp, signozMetricDBName, sampleTable, // Total samples
countExp, signozMetricDBName, sampleTable, // Inner select samples
signozMetricDBName, localTsTable, whereClause, metricsList, // Subquery conditions
metricsList, req.Limit, // Final conditions
)
LIMIT ?;`)
sampleQuery := sb.String()
// Add start and end time to args
args = append(args, start, end)
// Execute the sample percentage query

View File

@ -219,7 +219,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
jobsRepo := inframetrics.NewJobsRepo(opts.Reader, querierv2)
pvcsRepo := inframetrics.NewPvcsRepo(opts.Reader, querierv2)
//explorerCache := metricsexplorer.NewExplorerCache(metricsexplorer.WithCache(opts.Cache))
summaryService := metricsexplorer.NewSummaryService(opts.Reader, querierv2)
summaryService := metricsexplorer.NewSummaryService(opts.Reader, opts.RuleManager)
aH := &APIHandler{
reader: opts.Reader,

View File

@ -3,6 +3,7 @@ package metricsexplorer
import (
"context"
"encoding/json"
"errors"
"time"
"go.uber.org/zap"
@ -12,16 +13,17 @@ import (
"go.signoz.io/signoz/pkg/query-service/model"
"go.signoz.io/signoz/pkg/query-service/model/metrics_explorer"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/rules"
"golang.org/x/sync/errgroup"
)
type SummaryService struct {
reader interfaces.Reader
querierV2 interfaces.Querier
rulesManager *rules.Manager
}
func NewSummaryService(reader interfaces.Reader, querierV2 interfaces.Querier) *SummaryService {
return &SummaryService{reader: reader, querierV2: querierV2}
func NewSummaryService(reader interfaces.Reader, alertManager *rules.Manager) *SummaryService {
return &SummaryService{reader: reader, rulesManager: alertManager}
}
func (receiver *SummaryService) FilterKeys(ctx context.Context, params *metrics_explorer.FilterKeyRequest) (*metrics_explorer.FilterKeyResponse, *model.ApiError) {
@ -102,18 +104,24 @@ func (receiver *SummaryService) GetMetricsSummary(ctx context.Context, metricNam
return nil
})
// Call 2: GetMetricsDataPointsAndLastReceived
g.Go(func() error {
dataPoints, lastReceived, err := receiver.reader.GetMetricsDataPointsAndLastReceived(ctx, metricName)
dataPoints, err := receiver.reader.GetMetricsDataPoints(ctx, metricName)
if err != nil {
return err
}
metricDetailsDTO.Samples = dataPoints
return nil
})
g.Go(func() error {
lastReceived, err := receiver.reader.GetMetricsLastReceived(ctx, metricName)
if err != nil {
return err
}
metricDetailsDTO.LastReceived = lastReceived
return nil
})
// Call 3: GetTotalTimeSeriesForMetricName
g.Go(func() error {
totalSeries, err := receiver.reader.GetTotalTimeSeriesForMetricName(ctx, metricName)
if err != nil {
@ -123,7 +131,6 @@ func (receiver *SummaryService) GetMetricsSummary(ctx context.Context, metricNam
return nil
})
// Call 4: GetActiveTimeSeriesForMetricName
g.Go(func() error {
activeSeries, err := receiver.reader.GetActiveTimeSeriesForMetricName(ctx, metricName, 120*time.Minute)
if err != nil {
@ -133,7 +140,6 @@ func (receiver *SummaryService) GetMetricsSummary(ctx context.Context, metricNam
return nil
})
// Call 5: GetAttributesForMetricName
g.Go(func() error {
attributes, err := receiver.reader.GetAttributesForMetricName(ctx, metricName)
if err != nil {
@ -145,7 +151,6 @@ func (receiver *SummaryService) GetMetricsSummary(ctx context.Context, metricNam
return nil
})
// Call 6: GetDashboardsWithMetricName
g.Go(func() error {
data, err := dashboards.GetDashboardsWithMetricName(ctx, metricName)
if err != nil {
@ -169,13 +174,30 @@ func (receiver *SummaryService) GetMetricsSummary(ctx context.Context, metricNam
return nil
})
g.Go(func() error {
var metrics []string
var metricsAlerts []metrics_explorer.Alert
metrics = append(metrics, metricName)
data, err := receiver.rulesManager.GetAlertDetailsForMetricNames(ctx, metrics)
if err != nil {
return err
}
if rulesLists, ok := data[metricName]; ok {
for _, rule := range rulesLists {
metricsAlerts = append(metricsAlerts, metrics_explorer.Alert{AlertName: rule.AlertName, AlertID: rule.Id})
}
}
metricDetailsDTO.Alerts = metricsAlerts
return nil
})
// Wait for all goroutines and handle any errors
if err := g.Wait(); err != nil {
// Type assert to check if it's already an ApiError
if apiErr, ok := err.(*model.ApiError); ok {
var apiErr *model.ApiError
if errors.As(err, &apiErr) {
return metrics_explorer.MetricDetailsDTO{}, apiErr
}
// If it's not an ApiError, wrap it in one
return metrics_explorer.MetricDetailsDTO{}, &model.ApiError{Typ: "InternalError", Err: err}
}
@ -194,14 +216,18 @@ func (receiver *SummaryService) GetMetricsTreemap(ctx context.Context, params *m
if apiError != nil {
return nil, apiError
}
if cardinality != nil {
response.TimeSeries = *cardinality
}
return &response, nil
case metrics_explorer.SamplesTreeMap:
dataPoints, apiError := receiver.reader.GetMetricsSamplesPercentage(ctx, params)
if apiError != nil {
return nil, apiError
}
if dataPoints != nil {
response.Samples = *dataPoints
}
return &response, nil
default:
return nil, nil

View File

@ -122,7 +122,8 @@ type Reader interface {
GetAllMetricFilterTypes(ctx context.Context, req *metrics_explorer.FilterValueRequest) ([]string, *model.ApiError)
GetAllMetricFilterAttributeKeys(ctx context.Context, req *metrics_explorer.FilterKeyRequest, skipDotNames bool) (*[]v3.AttributeKey, *model.ApiError)
GetMetricsDataPointsAndLastReceived(ctx context.Context, metricName string) (uint64, uint64, *model.ApiError)
GetMetricsDataPoints(ctx context.Context, metricName string) (uint64, *model.ApiError)
GetMetricsLastReceived(ctx context.Context, metricName string) (int64, *model.ApiError)
GetTotalTimeSeriesForMetricName(ctx context.Context, metricName string) (uint64, *model.ApiError)
GetActiveTimeSeriesForMetricName(ctx context.Context, metricName string, duration time.Duration) (uint64, *model.ApiError)
GetAttributesForMetricName(ctx context.Context, metricName string) (*[]metrics_explorer.Attribute, *model.ApiError)

View File

@ -89,7 +89,7 @@ type MetricDetailsDTO struct {
Samples uint64 `json:"samples"`
TimeSeriesTotal uint64 `json:"timeSeriesTotal"`
TimeSeriesActive uint64 `json:"timeSeriesActive"`
LastReceived uint64 `json:"lastReceived"`
LastReceived int64 `json:"lastReceived"`
Attributes []Attribute `json:"attributes"`
Metadata Metadata `json:"metadata"`
Alerts []Alert `json:"alerts"`

View File

@ -829,3 +829,77 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m
return alertCount, apiErr
}
func (m *Manager) GetAlertDetailsForMetricNames(ctx context.Context, metricNames []string) (map[string][]GettableRule, error) {
result := make(map[string][]GettableRule)
rules, err := m.ruleDB.GetStoredRules(ctx)
if err != nil {
zap.L().Error("Error getting stored rules", zap.Error(err))
return nil, err
}
metricRulesMap := make(map[string][]GettableRule)
for _, storedRule := range rules {
var rule GettableRule
if err := json.Unmarshal([]byte(storedRule.Data), &rule); err != nil {
zap.L().Error("Invalid rule data", zap.Error(err))
continue
}
if rule.AlertType != AlertTypeMetric || rule.RuleCondition == nil || rule.RuleCondition.CompositeQuery == nil {
continue
}
rule.Id = fmt.Sprintf("%d", storedRule.Id)
rule.CreatedAt = storedRule.CreatedAt
rule.CreatedBy = storedRule.CreatedBy
rule.UpdatedAt = storedRule.UpdatedAt
rule.UpdatedBy = storedRule.UpdatedBy
for _, query := range rule.RuleCondition.CompositeQuery.BuilderQueries {
if query.AggregateAttribute.Key != "" {
metricRulesMap[query.AggregateAttribute.Key] = append(metricRulesMap[query.AggregateAttribute.Key], rule)
}
}
for _, query := range rule.RuleCondition.CompositeQuery.PromQueries {
if query.Query != "" {
for _, metricName := range metricNames {
if strings.Contains(query.Query, metricName) {
metricRulesMap[metricName] = append(metricRulesMap[metricName], rule)
}
}
}
}
for _, query := range rule.RuleCondition.CompositeQuery.ClickHouseQueries {
if query.Query != "" {
for _, metricName := range metricNames {
if strings.Contains(query.Query, metricName) {
metricRulesMap[metricName] = append(metricRulesMap[metricName], rule)
}
}
}
}
}
for _, metricName := range metricNames {
if rules, exists := metricRulesMap[metricName]; exists {
seen := make(map[string]bool)
uniqueRules := make([]GettableRule, 0)
for _, rule := range rules {
if !seen[rule.Id] {
seen[rule.Id] = true
uniqueRules = append(uniqueRules, rule)
}
}
result[metricName] = uniqueRules
}
}
return result, nil
}