diff --git a/ee/query-service/anomaly/daily.go b/ee/query-service/anomaly/daily.go index 2d40974927..bbafe1618e 100644 --- a/ee/query-service/anomaly/daily.go +++ b/ee/query-service/anomaly/daily.go @@ -2,6 +2,9 @@ package anomaly import ( "context" + + querierV2 "go.signoz.io/signoz/pkg/query-service/app/querier/v2" + "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" ) type DailyProvider struct { @@ -24,9 +27,18 @@ func NewDailyProvider(opts ...GenericProviderOption[*DailyProvider]) *DailyProvi opt(dp) } + dp.querierV2 = querierV2.NewQuerier(querierV2.QuerierOptions{ + Reader: dp.reader, + Cache: dp.cache, + KeyGenerator: queryBuilder.NewKeyGenerator(), + FluxInterval: dp.fluxInterval, + FeatureLookup: dp.ff, + }) + return dp } func (p *DailyProvider) GetAnomalies(ctx context.Context, req *GetAnomaliesRequest) (*GetAnomaliesResponse, error) { - return nil, nil + req.Seasonality = SeasonalityDaily + return p.getAnomalies(ctx, req) } diff --git a/ee/query-service/anomaly/hourly.go b/ee/query-service/anomaly/hourly.go index b3af3d01d8..1ee08655f0 100644 --- a/ee/query-service/anomaly/hourly.go +++ b/ee/query-service/anomaly/hourly.go @@ -2,6 +2,9 @@ package anomaly import ( "context" + + querierV2 "go.signoz.io/signoz/pkg/query-service/app/querier/v2" + "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" ) type HourlyProvider struct { @@ -24,9 +27,18 @@ func NewHourlyProvider(opts ...GenericProviderOption[*HourlyProvider]) *HourlyPr opt(hp) } + hp.querierV2 = querierV2.NewQuerier(querierV2.QuerierOptions{ + Reader: hp.reader, + Cache: hp.cache, + KeyGenerator: queryBuilder.NewKeyGenerator(), + FluxInterval: hp.fluxInterval, + FeatureLookup: hp.ff, + }) + return hp } func (p *HourlyProvider) GetAnomalies(ctx context.Context, req *GetAnomaliesRequest) (*GetAnomaliesResponse, error) { - return nil, nil + req.Seasonality = SeasonalityHourly + return p.getAnomalies(ctx, req) } diff --git a/ee/query-service/anomaly/params.go b/ee/query-service/anomaly/params.go index 009374bf62..d39b2fa80f 100644 --- a/ee/query-service/anomaly/params.go +++ b/ee/query-service/anomaly/params.go @@ -16,6 +16,13 @@ const ( SeasonalityWeekly Seasonality = "weekly" ) +var ( + oneWeekOffset = 24 * 7 * time.Hour.Milliseconds() + oneDayOffset = 24 * time.Hour.Milliseconds() + oneHourOffset = time.Hour.Milliseconds() + fiveMinOffset = 5 * time.Minute.Milliseconds() +) + func (s Seasonality) IsValid() bool { switch s { case SeasonalityHourly, SeasonalityDaily, SeasonalityWeekly: @@ -35,7 +42,7 @@ type GetAnomaliesResponse struct { } // anomalyParams is the params for anomaly detection -// prediction = avg(past_period_query) + avg(current_season_query) - avg(past_season_query) +// prediction = avg(past_period_query) + avg(current_season_query) - mean(past_season_query, past2_season_query, past3_season_query) // // ^ ^ // | | @@ -49,9 +56,9 @@ type anomalyQueryParams struct { // and to detect anomalies CurrentPeriodQuery *v3.QueryRangeParamsV3 // PastPeriodQuery is the query range params for past seasonal period - // Example: For weekly seasonality, (now-1w-4h-5m, now-1w) - // : For daily seasonality, (now-1d-2h-5m, now-1d) - // : For hourly seasonality, (now-1h-30m-5m, now-1h) + // Example: For weekly seasonality, (now-1w-5m, now-1w) + // : For daily seasonality, (now-1d-5m, now-1d) + // : For hourly seasonality, (now-1h-5m, now-1h) PastPeriodQuery *v3.QueryRangeParamsV3 // CurrentSeasonQuery is the query range params for current period (seasonal) // Example: For weekly seasonality, this is the query range params for the (now-1w-5m, now) @@ -63,16 +70,17 @@ type anomalyQueryParams struct { // : For daily seasonality, this is the query range params for the (now-2d-5m, now-1d) // : For hourly seasonality, this is the query range params for the (now-2h-5m, now-1h) PastSeasonQuery *v3.QueryRangeParamsV3 -} -func copyCompositeQuery(req *v3.QueryRangeParamsV3) *v3.CompositeQuery { - deepCopyCompositeQuery := *req.CompositeQuery - deepCopyCompositeQuery.BuilderQueries = make(map[string]*v3.BuilderQuery) - for k, v := range req.CompositeQuery.BuilderQueries { - query := *v - deepCopyCompositeQuery.BuilderQueries[k] = &query - } - return &deepCopyCompositeQuery + // Past2SeasonQuery is the query range params for past 2 seasonal period to the current season + // Example: For weekly seasonality, this is the query range params for the (now-3w-5m, now-2w) + // : For daily seasonality, this is the query range params for the (now-3d-5m, now-2d) + // : For hourly seasonality, this is the query range params for the (now-3h-5m, now-2h) + Past2SeasonQuery *v3.QueryRangeParamsV3 + // Past3SeasonQuery is the query range params for past 3 seasonal period to the current season + // Example: For weekly seasonality, this is the query range params for the (now-4w-5m, now-3w) + // : For daily seasonality, this is the query range params for the (now-4d-5m, now-3d) + // : For hourly seasonality, this is the query range params for the (now-4h-5m, now-3h) + Past3SeasonQuery *v3.QueryRangeParamsV3 } func updateStepInterval(req *v3.QueryRangeParamsV3) { @@ -95,7 +103,7 @@ func prepareAnomalyQueryParams(req *v3.QueryRangeParamsV3, seasonality Seasonali currentPeriodQuery := &v3.QueryRangeParamsV3{ Start: start, End: end, - CompositeQuery: req.CompositeQuery, + CompositeQuery: req.CompositeQuery.Clone(), Variables: make(map[string]interface{}, 0), NoCache: false, } @@ -104,24 +112,24 @@ func prepareAnomalyQueryParams(req *v3.QueryRangeParamsV3, seasonality Seasonali var pastPeriodStart, pastPeriodEnd int64 switch seasonality { - // for one week period, we fetch the data from the past week with 4 hours offset + // for one week period, we fetch the data from the past week with 5 min offset case SeasonalityWeekly: - pastPeriodStart = start - 166*time.Hour.Milliseconds() - 4*time.Hour.Milliseconds() - pastPeriodEnd = end - 166*time.Hour.Milliseconds() - // for one day period, we fetch the data from the past day with 2 hours offset + pastPeriodStart = start - oneWeekOffset - fiveMinOffset + pastPeriodEnd = end - oneWeekOffset + // for one day period, we fetch the data from the past day with 5 min offset case SeasonalityDaily: - pastPeriodStart = start - 23*time.Hour.Milliseconds() - 2*time.Hour.Milliseconds() - pastPeriodEnd = end - 23*time.Hour.Milliseconds() - // for one hour period, we fetch the data from the past hour with 30 minutes offset + pastPeriodStart = start - oneDayOffset - fiveMinOffset + pastPeriodEnd = end - oneDayOffset + // for one hour period, we fetch the data from the past hour with 5 min offset case SeasonalityHourly: - pastPeriodStart = start - 1*time.Hour.Milliseconds() - 30*time.Minute.Milliseconds() - pastPeriodEnd = end - 1*time.Hour.Milliseconds() + pastPeriodStart = start - oneHourOffset - fiveMinOffset + pastPeriodEnd = end - oneHourOffset } pastPeriodQuery := &v3.QueryRangeParamsV3{ Start: pastPeriodStart, End: pastPeriodEnd, - CompositeQuery: copyCompositeQuery(req), + CompositeQuery: req.CompositeQuery.Clone(), Variables: make(map[string]interface{}, 0), NoCache: false, } @@ -131,20 +139,20 @@ func prepareAnomalyQueryParams(req *v3.QueryRangeParamsV3, seasonality Seasonali var currentGrowthPeriodStart, currentGrowthPeriodEnd int64 switch seasonality { case SeasonalityWeekly: - currentGrowthPeriodStart = start - 7*24*time.Hour.Milliseconds() + currentGrowthPeriodStart = start - oneWeekOffset currentGrowthPeriodEnd = end case SeasonalityDaily: - currentGrowthPeriodStart = start - 23*time.Hour.Milliseconds() + currentGrowthPeriodStart = start - oneDayOffset currentGrowthPeriodEnd = end case SeasonalityHourly: - currentGrowthPeriodStart = start - 1*time.Hour.Milliseconds() + currentGrowthPeriodStart = start - oneHourOffset currentGrowthPeriodEnd = end } currentGrowthQuery := &v3.QueryRangeParamsV3{ Start: currentGrowthPeriodStart, End: currentGrowthPeriodEnd, - CompositeQuery: copyCompositeQuery(req), + CompositeQuery: req.CompositeQuery.Clone(), Variables: make(map[string]interface{}, 0), NoCache: false, } @@ -153,30 +161,76 @@ func prepareAnomalyQueryParams(req *v3.QueryRangeParamsV3, seasonality Seasonali var pastGrowthPeriodStart, pastGrowthPeriodEnd int64 switch seasonality { case SeasonalityWeekly: - pastGrowthPeriodStart = start - 14*24*time.Hour.Milliseconds() - pastGrowthPeriodEnd = start - 7*24*time.Hour.Milliseconds() + pastGrowthPeriodStart = start - 2*oneWeekOffset + pastGrowthPeriodEnd = start - 1*oneWeekOffset case SeasonalityDaily: - pastGrowthPeriodStart = start - 2*time.Hour.Milliseconds() - pastGrowthPeriodEnd = start - 1*time.Hour.Milliseconds() + pastGrowthPeriodStart = start - 2*oneDayOffset + pastGrowthPeriodEnd = start - 1*oneDayOffset case SeasonalityHourly: - pastGrowthPeriodStart = start - 2*time.Hour.Milliseconds() - pastGrowthPeriodEnd = start - 1*time.Hour.Milliseconds() + pastGrowthPeriodStart = start - 2*oneHourOffset + pastGrowthPeriodEnd = start - 1*oneHourOffset } pastGrowthQuery := &v3.QueryRangeParamsV3{ Start: pastGrowthPeriodStart, End: pastGrowthPeriodEnd, - CompositeQuery: copyCompositeQuery(req), + CompositeQuery: req.CompositeQuery.Clone(), Variables: make(map[string]interface{}, 0), NoCache: false, } updateStepInterval(pastGrowthQuery) + var past2GrowthPeriodStart, past2GrowthPeriodEnd int64 + switch seasonality { + case SeasonalityWeekly: + past2GrowthPeriodStart = start - 3*oneWeekOffset + past2GrowthPeriodEnd = start - 2*oneWeekOffset + case SeasonalityDaily: + past2GrowthPeriodStart = start - 3*oneDayOffset + past2GrowthPeriodEnd = start - 2*oneDayOffset + case SeasonalityHourly: + past2GrowthPeriodStart = start - 3*oneHourOffset + past2GrowthPeriodEnd = start - 2*oneHourOffset + } + + past2GrowthQuery := &v3.QueryRangeParamsV3{ + Start: past2GrowthPeriodStart, + End: past2GrowthPeriodEnd, + CompositeQuery: req.CompositeQuery.Clone(), + Variables: make(map[string]interface{}, 0), + NoCache: false, + } + updateStepInterval(past2GrowthQuery) + + var past3GrowthPeriodStart, past3GrowthPeriodEnd int64 + switch seasonality { + case SeasonalityWeekly: + past3GrowthPeriodStart = start - 4*oneWeekOffset + past3GrowthPeriodEnd = start - 3*oneWeekOffset + case SeasonalityDaily: + past3GrowthPeriodStart = start - 4*oneDayOffset + past3GrowthPeriodEnd = start - 3*oneDayOffset + case SeasonalityHourly: + past3GrowthPeriodStart = start - 4*oneHourOffset + past3GrowthPeriodEnd = start - 3*oneHourOffset + } + + past3GrowthQuery := &v3.QueryRangeParamsV3{ + Start: past3GrowthPeriodStart, + End: past3GrowthPeriodEnd, + CompositeQuery: req.CompositeQuery.Clone(), + Variables: make(map[string]interface{}, 0), + NoCache: false, + } + updateStepInterval(past3GrowthQuery) + return &anomalyQueryParams{ CurrentPeriodQuery: currentPeriodQuery, PastPeriodQuery: pastPeriodQuery, CurrentSeasonQuery: currentGrowthQuery, PastSeasonQuery: pastGrowthQuery, + Past2SeasonQuery: past2GrowthQuery, + Past3SeasonQuery: past3GrowthQuery, } } @@ -185,4 +239,6 @@ type anomalyQueryResults struct { PastPeriodResults []*v3.Result CurrentSeasonResults []*v3.Result PastSeasonResults []*v3.Result + Past2SeasonResults []*v3.Result + Past3SeasonResults []*v3.Result } diff --git a/ee/query-service/anomaly/seasonal.go b/ee/query-service/anomaly/seasonal.go index ec95804fbe..485ab7f460 100644 --- a/ee/query-service/anomaly/seasonal.go +++ b/ee/query-service/anomaly/seasonal.go @@ -3,14 +3,21 @@ package anomaly import ( "context" "math" + "time" "go.signoz.io/signoz/pkg/query-service/cache" "go.signoz.io/signoz/pkg/query-service/interfaces" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/postprocess" "go.signoz.io/signoz/pkg/query-service/utils/labels" "go.uber.org/zap" ) +var ( + // TODO(srikanthccv): make this configurable? + movingAvgWindowSize = 7 +) + // BaseProvider is an interface that includes common methods for all provider types type BaseProvider interface { GetBaseSeasonalProvider() *BaseSeasonalProvider @@ -46,6 +53,7 @@ func WithReader[T BaseProvider](reader interfaces.Reader) GenericProviderOption[ type BaseSeasonalProvider struct { querierV2 interfaces.Querier reader interfaces.Reader + fluxInterval time.Duration cache cache.Cache keyGenerator cache.KeyGenerator ff interfaces.FeatureLookup @@ -53,28 +61,68 @@ type BaseSeasonalProvider struct { func (p *BaseSeasonalProvider) getQueryParams(req *GetAnomaliesRequest) *anomalyQueryParams { if !req.Seasonality.IsValid() { - req.Seasonality = SeasonalityWeekly + req.Seasonality = SeasonalityDaily } return prepareAnomalyQueryParams(req.Params, req.Seasonality) } func (p *BaseSeasonalProvider) getResults(ctx context.Context, params *anomalyQueryParams) (*anomalyQueryResults, error) { - currentPeriodResults, _, err := p.querierV2.QueryRange(ctx, params.CurrentPeriodQuery, nil) + currentPeriodResults, _, err := p.querierV2.QueryRange(ctx, params.CurrentPeriodQuery) if err != nil { return nil, err } - pastPeriodResults, _, err := p.querierV2.QueryRange(ctx, params.PastPeriodQuery, nil) + currentPeriodResults, err = postprocess.PostProcessResult(currentPeriodResults, params.CurrentPeriodQuery) if err != nil { return nil, err } - currentSeasonResults, _, err := p.querierV2.QueryRange(ctx, params.CurrentSeasonQuery, nil) + pastPeriodResults, _, err := p.querierV2.QueryRange(ctx, params.PastPeriodQuery) if err != nil { return nil, err } - pastSeasonResults, _, err := p.querierV2.QueryRange(ctx, params.PastSeasonQuery, nil) + pastPeriodResults, err = postprocess.PostProcessResult(pastPeriodResults, params.PastPeriodQuery) + if err != nil { + return nil, err + } + + currentSeasonResults, _, err := p.querierV2.QueryRange(ctx, params.CurrentSeasonQuery) + if err != nil { + return nil, err + } + + currentSeasonResults, err = postprocess.PostProcessResult(currentSeasonResults, params.CurrentSeasonQuery) + if err != nil { + return nil, err + } + + pastSeasonResults, _, err := p.querierV2.QueryRange(ctx, params.PastSeasonQuery) + if err != nil { + return nil, err + } + + pastSeasonResults, err = postprocess.PostProcessResult(pastSeasonResults, params.PastSeasonQuery) + if err != nil { + return nil, err + } + + past2SeasonResults, _, err := p.querierV2.QueryRange(ctx, params.Past2SeasonQuery) + if err != nil { + return nil, err + } + + past2SeasonResults, err = postprocess.PostProcessResult(past2SeasonResults, params.Past2SeasonQuery) + if err != nil { + return nil, err + } + + past3SeasonResults, _, err := p.querierV2.QueryRange(ctx, params.Past3SeasonQuery) + if err != nil { + return nil, err + } + + past3SeasonResults, err = postprocess.PostProcessResult(past3SeasonResults, params.Past3SeasonQuery) if err != nil { return nil, err } @@ -84,10 +132,18 @@ func (p *BaseSeasonalProvider) getResults(ctx context.Context, params *anomalyQu PastPeriodResults: pastPeriodResults, CurrentSeasonResults: currentSeasonResults, PastSeasonResults: pastSeasonResults, + Past2SeasonResults: past2SeasonResults, + Past3SeasonResults: past3SeasonResults, }, nil } +// getMatchingSeries gets the matching series from the query result +// for the given series func (p *BaseSeasonalProvider) getMatchingSeries(queryResult *v3.Result, series *v3.Series) *v3.Series { + if queryResult == nil || len(queryResult.Series) == 0 { + return nil + } + for _, curr := range queryResult.Series { currLabels := labels.FromMap(curr.Labels) seriesLabels := labels.FromMap(series.Labels) @@ -99,6 +155,9 @@ func (p *BaseSeasonalProvider) getMatchingSeries(queryResult *v3.Result, series } func (p *BaseSeasonalProvider) getAvg(series *v3.Series) float64 { + if series == nil || len(series.Points) == 0 { + return 0 + } var sum float64 for _, smpl := range series.Points { sum += smpl.Value @@ -107,6 +166,9 @@ func (p *BaseSeasonalProvider) getAvg(series *v3.Series) float64 { } func (p *BaseSeasonalProvider) getStdDev(series *v3.Series) float64 { + if series == nil || len(series.Points) == 0 { + return 0 + } avg := p.getAvg(series) var sum float64 for _, smpl := range series.Points { @@ -115,15 +177,65 @@ func (p *BaseSeasonalProvider) getStdDev(series *v3.Series) float64 { return math.Sqrt(sum / float64(len(series.Points))) } -func (p *BaseSeasonalProvider) getPredictedSeries(series, prevSeries, currentSeasonSeries, pastSeasonSeries *v3.Series) *v3.Series { +// getMovingAvg gets the moving average for the given series +// for the given window size and start index +func (p *BaseSeasonalProvider) getMovingAvg(series *v3.Series, movingAvgWindowSize, startIdx int) float64 { + if series == nil || len(series.Points) == 0 { + return 0 + } + if startIdx >= len(series.Points)-movingAvgWindowSize { + startIdx = len(series.Points) - movingAvgWindowSize + } + var sum float64 + points := series.Points[startIdx:] + for i := 0; i < movingAvgWindowSize && i < len(points); i++ { + sum += points[i].Value + } + avg := sum / float64(movingAvgWindowSize) + return avg +} + +func (p *BaseSeasonalProvider) getMean(floats ...float64) float64 { + if len(floats) == 0 { + return 0 + } + var sum float64 + for _, f := range floats { + sum += f + } + return sum / float64(len(floats)) +} + +func (p *BaseSeasonalProvider) getPredictedSeries( + series, prevSeries, currentSeasonSeries, pastSeasonSeries, past2SeasonSeries, past3SeasonSeries *v3.Series, +) *v3.Series { predictedSeries := &v3.Series{ Labels: series.Labels, LabelsArray: series.LabelsArray, Points: []v3.Point{}, } - for _, curr := range series.Points { - predictedValue := p.getAvg(prevSeries) + p.getAvg(currentSeasonSeries) - p.getAvg(pastSeasonSeries) + // for each point in the series, get the predicted value + // the predicted value is the moving average (with window size = 7) of the previous period series + // plus the average of the current season series + // minus the mean of the past season series, past2 season series and past3 season series + for idx, curr := range series.Points { + predictedValue := + p.getMovingAvg(prevSeries, movingAvgWindowSize, idx) + + p.getAvg(currentSeasonSeries) - + p.getMean(p.getAvg(pastSeasonSeries), p.getAvg(past2SeasonSeries), p.getAvg(past3SeasonSeries)) + + if predictedValue < 0 { + predictedValue = p.getMovingAvg(prevSeries, movingAvgWindowSize, idx) + } + + zap.L().Info("predictedSeries", + zap.Float64("movingAvg", p.getMovingAvg(prevSeries, movingAvgWindowSize, idx)), + zap.Float64("avg", p.getAvg(currentSeasonSeries)), + zap.Float64("mean", p.getMean(p.getAvg(pastSeasonSeries), p.getAvg(past2SeasonSeries), p.getAvg(past3SeasonSeries))), + zap.Any("labels", series.Labels), + zap.Float64("predictedValue", predictedValue), + ) predictedSeries.Points = append(predictedSeries.Points, v3.Point{ Timestamp: curr.Timestamp, Value: predictedValue, @@ -133,33 +245,80 @@ func (p *BaseSeasonalProvider) getPredictedSeries(series, prevSeries, currentSea return predictedSeries } -func (p *BaseSeasonalProvider) getExpectedValue(_, prevSeries, currentSeasonSeries, pastSeasonSeries *v3.Series) float64 { - prevSeriesAvg := p.getAvg(prevSeries) - currentSeasonSeriesAvg := p.getAvg(currentSeasonSeries) - pastSeasonSeriesAvg := p.getAvg(pastSeasonSeries) - zap.L().Debug("getExpectedValue", - zap.Float64("prevSeriesAvg", prevSeriesAvg), - zap.Float64("currentSeasonSeriesAvg", currentSeasonSeriesAvg), - zap.Float64("pastSeasonSeriesAvg", pastSeasonSeriesAvg), - zap.Float64("expectedValue", prevSeriesAvg+currentSeasonSeriesAvg-pastSeasonSeriesAvg), - ) - return prevSeriesAvg + currentSeasonSeriesAvg - pastSeasonSeriesAvg +// getBounds gets the upper and lower bounds for the given series +// for the given z score threshold +// moving avg of the previous period series + z score threshold * std dev of the series +// moving avg of the previous period series - z score threshold * std dev of the series +func (p *BaseSeasonalProvider) getBounds( + series, prevSeries, _, _, _, _ *v3.Series, + zScoreThreshold float64, +) (*v3.Series, *v3.Series) { + upperBoundSeries := &v3.Series{ + Labels: series.Labels, + LabelsArray: series.LabelsArray, + Points: []v3.Point{}, + } + + lowerBoundSeries := &v3.Series{ + Labels: series.Labels, + LabelsArray: series.LabelsArray, + Points: []v3.Point{}, + } + + for idx, curr := range series.Points { + upperBound := p.getMovingAvg(prevSeries, movingAvgWindowSize, idx) + zScoreThreshold*p.getStdDev(series) + lowerBound := p.getMovingAvg(prevSeries, movingAvgWindowSize, idx) - zScoreThreshold*p.getStdDev(series) + upperBoundSeries.Points = append(upperBoundSeries.Points, v3.Point{ + Timestamp: curr.Timestamp, + Value: upperBound, + }) + lowerBoundSeries.Points = append(lowerBoundSeries.Points, v3.Point{ + Timestamp: curr.Timestamp, + Value: math.Max(lowerBound, 0), + }) + } + + return upperBoundSeries, lowerBoundSeries } -func (p *BaseSeasonalProvider) getScore(series, prevSeries, weekSeries, weekPrevSeries *v3.Series, value float64) float64 { - expectedValue := p.getExpectedValue(series, prevSeries, weekSeries, weekPrevSeries) +// getExpectedValue gets the expected value for the given series +// for the given index +// prevSeriesAvg + currentSeasonSeriesAvg - mean of past season series, past2 season series and past3 season series +func (p *BaseSeasonalProvider) getExpectedValue( + _, prevSeries, currentSeasonSeries, pastSeasonSeries, past2SeasonSeries, past3SeasonSeries *v3.Series, idx int, +) float64 { + prevSeriesAvg := p.getMovingAvg(prevSeries, movingAvgWindowSize, idx) + currentSeasonSeriesAvg := p.getAvg(currentSeasonSeries) + pastSeasonSeriesAvg := p.getAvg(pastSeasonSeries) + past2SeasonSeriesAvg := p.getAvg(past2SeasonSeries) + past3SeasonSeriesAvg := p.getAvg(past3SeasonSeries) + return prevSeriesAvg + currentSeasonSeriesAvg - p.getMean(pastSeasonSeriesAvg, past2SeasonSeriesAvg, past3SeasonSeriesAvg) +} + +// getScore gets the anomaly score for the given series +// for the given index +// (value - expectedValue) / std dev of the series +func (p *BaseSeasonalProvider) getScore( + series, prevSeries, weekSeries, weekPrevSeries, past2SeasonSeries, past3SeasonSeries *v3.Series, value float64, idx int, +) float64 { + expectedValue := p.getExpectedValue(series, prevSeries, weekSeries, weekPrevSeries, past2SeasonSeries, past3SeasonSeries, idx) return (value - expectedValue) / p.getStdDev(weekSeries) } -func (p *BaseSeasonalProvider) getAnomalyScores(series, prevSeries, currentSeasonSeries, pastSeasonSeries *v3.Series) *v3.Series { +// getAnomalyScores gets the anomaly scores for the given series +// for the given index +// (value - expectedValue) / std dev of the series +func (p *BaseSeasonalProvider) getAnomalyScores( + series, prevSeries, currentSeasonSeries, pastSeasonSeries, past2SeasonSeries, past3SeasonSeries *v3.Series, +) *v3.Series { anomalyScoreSeries := &v3.Series{ Labels: series.Labels, LabelsArray: series.LabelsArray, Points: []v3.Point{}, } - for _, curr := range series.Points { - anomalyScore := p.getScore(series, prevSeries, currentSeasonSeries, pastSeasonSeries, curr.Value) + for idx, curr := range series.Points { + anomalyScore := p.getScore(series, prevSeries, currentSeasonSeries, pastSeasonSeries, past2SeasonSeries, past3SeasonSeries, curr.Value, idx) anomalyScoreSeries.Points = append(anomalyScoreSeries.Points, v3.Point{ Timestamp: curr.Timestamp, Value: anomalyScore, @@ -169,7 +328,7 @@ func (p *BaseSeasonalProvider) getAnomalyScores(series, prevSeries, currentSeaso return anomalyScoreSeries } -func (p *BaseSeasonalProvider) GetAnomalies(ctx context.Context, req *GetAnomaliesRequest) (*GetAnomaliesResponse, error) { +func (p *BaseSeasonalProvider) getAnomalies(ctx context.Context, req *GetAnomaliesRequest) (*GetAnomaliesResponse, error) { anomalyParams := p.getQueryParams(req) anomalyQueryResults, err := p.getResults(ctx, anomalyParams) if err != nil { @@ -196,7 +355,32 @@ func (p *BaseSeasonalProvider) GetAnomalies(ctx context.Context, req *GetAnomali pastSeasonResultsMap[result.QueryName] = result } + past2SeasonResultsMap := make(map[string]*v3.Result) + for _, result := range anomalyQueryResults.Past2SeasonResults { + past2SeasonResultsMap[result.QueryName] = result + } + + past3SeasonResultsMap := make(map[string]*v3.Result) + for _, result := range anomalyQueryResults.Past3SeasonResults { + past3SeasonResultsMap[result.QueryName] = result + } + for _, result := range currentPeriodResultsMap { + funcs := req.Params.CompositeQuery.BuilderQueries[result.QueryName].Functions + + var zScoreThreshold float64 + for _, f := range funcs { + if f.Name == v3.FunctionNameAnomaly { + value, ok := f.NamedArgs["z_score_threshold"] + if ok { + zScoreThreshold = value.(float64) + } else { + zScoreThreshold = 3 + } + break + } + } + pastPeriodResult, ok := pastPeriodResultsMap[result.QueryName] if !ok { continue @@ -209,21 +393,72 @@ func (p *BaseSeasonalProvider) GetAnomalies(ctx context.Context, req *GetAnomali if !ok { continue } + past2SeasonResult, ok := past2SeasonResultsMap[result.QueryName] + if !ok { + continue + } + past3SeasonResult, ok := past3SeasonResultsMap[result.QueryName] + if !ok { + continue + } for _, series := range result.Series { + stdDev := p.getStdDev(series) + zap.L().Info("stdDev", zap.Float64("stdDev", stdDev), zap.Any("labels", series.Labels)) + pastPeriodSeries := p.getMatchingSeries(pastPeriodResult, series) currentSeasonSeries := p.getMatchingSeries(currentSeasonResult, series) pastSeasonSeries := p.getMatchingSeries(pastSeasonResult, series) + past2SeasonSeries := p.getMatchingSeries(past2SeasonResult, series) + past3SeasonSeries := p.getMatchingSeries(past3SeasonResult, series) - predictedSeries := p.getPredictedSeries(series, pastPeriodSeries, currentSeasonSeries, pastSeasonSeries) + prevSeriesAvg := p.getAvg(pastPeriodSeries) + currentSeasonSeriesAvg := p.getAvg(currentSeasonSeries) + pastSeasonSeriesAvg := p.getAvg(pastSeasonSeries) + past2SeasonSeriesAvg := p.getAvg(past2SeasonSeries) + past3SeasonSeriesAvg := p.getAvg(past3SeasonSeries) + zap.L().Info("getAvg", zap.Float64("prevSeriesAvg", prevSeriesAvg), zap.Float64("currentSeasonSeriesAvg", currentSeasonSeriesAvg), zap.Float64("pastSeasonSeriesAvg", pastSeasonSeriesAvg), zap.Float64("past2SeasonSeriesAvg", past2SeasonSeriesAvg), zap.Float64("past3SeasonSeriesAvg", past3SeasonSeriesAvg), zap.Any("labels", series.Labels)) + + predictedSeries := p.getPredictedSeries( + series, + pastPeriodSeries, + currentSeasonSeries, + pastSeasonSeries, + past2SeasonSeries, + past3SeasonSeries, + ) result.PredictedSeries = append(result.PredictedSeries, predictedSeries) - anomalyScoreSeries := p.getAnomalyScores(series, pastPeriodSeries, currentSeasonSeries, pastSeasonSeries) + upperBoundSeries, lowerBoundSeries := p.getBounds( + series, + pastPeriodSeries, + currentSeasonSeries, + pastSeasonSeries, + past2SeasonSeries, + past3SeasonSeries, + zScoreThreshold, + ) + result.UpperBoundSeries = append(result.UpperBoundSeries, upperBoundSeries) + result.LowerBoundSeries = append(result.LowerBoundSeries, lowerBoundSeries) + + anomalyScoreSeries := p.getAnomalyScores( + series, + pastPeriodSeries, + currentSeasonSeries, + pastSeasonSeries, + past2SeasonSeries, + past3SeasonSeries, + ) result.AnomalyScores = append(result.AnomalyScores, anomalyScoreSeries) } } + results := make([]*v3.Result, 0, len(currentPeriodResultsMap)) + for _, result := range currentPeriodResultsMap { + results = append(results, result) + } + return &GetAnomaliesResponse{ - Results: anomalyQueryResults.CurrentPeriodResults, + Results: results, }, nil } diff --git a/ee/query-service/anomaly/weekly.go b/ee/query-service/anomaly/weekly.go index e41261df24..407e7e6440 100644 --- a/ee/query-service/anomaly/weekly.go +++ b/ee/query-service/anomaly/weekly.go @@ -2,6 +2,9 @@ package anomaly import ( "context" + + querierV2 "go.signoz.io/signoz/pkg/query-service/app/querier/v2" + "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" ) type WeeklyProvider struct { @@ -23,9 +26,18 @@ func NewWeeklyProvider(opts ...GenericProviderOption[*WeeklyProvider]) *WeeklyPr opt(wp) } + wp.querierV2 = querierV2.NewQuerier(querierV2.QuerierOptions{ + Reader: wp.reader, + Cache: wp.cache, + KeyGenerator: queryBuilder.NewKeyGenerator(), + FluxInterval: wp.fluxInterval, + FeatureLookup: wp.ff, + }) + return wp } func (p *WeeklyProvider) GetAnomalies(ctx context.Context, req *GetAnomaliesRequest) (*GetAnomaliesResponse, error) { - return nil, nil + req.Seasonality = SeasonalityWeekly + return p.getAnomalies(ctx, req) } diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index 9f6086a169..2d99118533 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -370,6 +370,22 @@ type QueryRangeParamsV3 struct { FormatForWeb bool `json:"formatForWeb,omitempty"` } +func (q *QueryRangeParamsV3) Clone() *QueryRangeParamsV3 { + if q == nil { + return nil + } + return &QueryRangeParamsV3{ + Start: q.Start, + End: q.End, + Step: q.Step, + CompositeQuery: q.CompositeQuery.Clone(), + Variables: q.Variables, + NoCache: q.NoCache, + Version: q.Version, + FormatForWeb: q.FormatForWeb, + } +} + type PromQuery struct { Query string `json:"query"` Stats string `json:"stats,omitempty"` @@ -377,6 +393,18 @@ type PromQuery struct { Legend string `json:"legend,omitempty"` } +func (p *PromQuery) Clone() *PromQuery { + if p == nil { + return nil + } + return &PromQuery{ + Query: p.Query, + Stats: p.Stats, + Disabled: p.Disabled, + Legend: p.Legend, + } +} + func (p *PromQuery) Validate() error { if p == nil { return nil @@ -395,6 +423,16 @@ type ClickHouseQuery struct { Legend string `json:"legend,omitempty"` } +func (c *ClickHouseQuery) Clone() *ClickHouseQuery { + if c == nil { + return nil + } + return &ClickHouseQuery{ + Query: c.Query, + Disabled: c.Disabled, + Legend: c.Legend, + } +} func (c *ClickHouseQuery) Validate() error { if c == nil { return nil @@ -420,6 +458,43 @@ type CompositeQuery struct { FillGaps bool `json:"fillGaps,omitempty"` } +func (c *CompositeQuery) Clone() *CompositeQuery { + if c == nil { + return nil + } + var builderQueries map[string]*BuilderQuery + if c.BuilderQueries != nil { + builderQueries = make(map[string]*BuilderQuery) + for name, query := range c.BuilderQueries { + builderQueries[name] = query.Clone() + } + } + var clickHouseQueries map[string]*ClickHouseQuery + if c.ClickHouseQueries != nil { + clickHouseQueries = make(map[string]*ClickHouseQuery) + for name, query := range c.ClickHouseQueries { + clickHouseQueries[name] = query.Clone() + } + } + var promQueries map[string]*PromQuery + if c.PromQueries != nil { + promQueries = make(map[string]*PromQuery) + for name, query := range c.PromQueries { + promQueries[name] = query.Clone() + } + } + return &CompositeQuery{ + BuilderQueries: builderQueries, + ClickHouseQueries: clickHouseQueries, + PromQueries: promQueries, + PanelType: c.PanelType, + QueryType: c.QueryType, + Unit: c.Unit, + FillGaps: c.FillGaps, + } + +} + func (c *CompositeQuery) EnabledQueries() int { count := 0 switch c.QueryType { @@ -645,6 +720,7 @@ const ( FunctionNameMedian5 FunctionName = "median5" FunctionNameMedian7 FunctionName = "median7" FunctionNameTimeShift FunctionName = "timeShift" + FunctionNameAnomaly FunctionName = "anomaly" ) func (f FunctionName) Validate() error { @@ -664,7 +740,8 @@ func (f FunctionName) Validate() error { FunctionNameMedian3, FunctionNameMedian5, FunctionNameMedian7, - FunctionNameTimeShift: + FunctionNameTimeShift, + FunctionNameAnomaly: return nil default: return fmt.Errorf("invalid function name: %s", f) @@ -672,33 +749,68 @@ func (f FunctionName) Validate() error { } type Function struct { - Name FunctionName `json:"name"` - Args []interface{} `json:"args,omitempty"` + Name FunctionName `json:"name"` + Args []interface{} `json:"args,omitempty"` + NamedArgs map[string]interface{} `json:"namedArgs,omitempty"` } type BuilderQuery struct { - QueryName string `json:"queryName"` - StepInterval int64 `json:"stepInterval"` - DataSource DataSource `json:"dataSource"` - AggregateOperator AggregateOperator `json:"aggregateOperator"` - AggregateAttribute AttributeKey `json:"aggregateAttribute,omitempty"` - Temporality Temporality `json:"temporality,omitempty"` - Filters *FilterSet `json:"filters,omitempty"` - GroupBy []AttributeKey `json:"groupBy,omitempty"` - Expression string `json:"expression"` - Disabled bool `json:"disabled"` - Having []Having `json:"having,omitempty"` - Legend string `json:"legend,omitempty"` - Limit uint64 `json:"limit"` - Offset uint64 `json:"offset"` - PageSize uint64 `json:"pageSize"` - OrderBy []OrderBy `json:"orderBy,omitempty"` - ReduceTo ReduceToOperator `json:"reduceTo,omitempty"` - SelectColumns []AttributeKey `json:"selectColumns,omitempty"` - TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"` - SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"` - Functions []Function `json:"functions,omitempty"` - ShiftBy int64 + QueryName string `json:"queryName"` + StepInterval int64 `json:"stepInterval"` + DataSource DataSource `json:"dataSource"` + AggregateOperator AggregateOperator `json:"aggregateOperator"` + AggregateAttribute AttributeKey `json:"aggregateAttribute,omitempty"` + Temporality Temporality `json:"temporality,omitempty"` + Filters *FilterSet `json:"filters,omitempty"` + GroupBy []AttributeKey `json:"groupBy,omitempty"` + Expression string `json:"expression"` + Disabled bool `json:"disabled"` + Having []Having `json:"having,omitempty"` + Legend string `json:"legend,omitempty"` + Limit uint64 `json:"limit"` + Offset uint64 `json:"offset"` + PageSize uint64 `json:"pageSize"` + OrderBy []OrderBy `json:"orderBy,omitempty"` + ReduceTo ReduceToOperator `json:"reduceTo,omitempty"` + SelectColumns []AttributeKey `json:"selectColumns,omitempty"` + TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"` + SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"` + Functions []Function `json:"functions,omitempty"` + ShiftBy int64 + IsAnomaly bool + QueriesUsedInFormula []string +} + +func (b *BuilderQuery) Clone() *BuilderQuery { + if b == nil { + return nil + } + return &BuilderQuery{ + QueryName: b.QueryName, + StepInterval: b.StepInterval, + DataSource: b.DataSource, + AggregateOperator: b.AggregateOperator, + AggregateAttribute: b.AggregateAttribute, + Temporality: b.Temporality, + Filters: b.Filters.Clone(), + GroupBy: b.GroupBy, + Expression: b.Expression, + Disabled: b.Disabled, + Having: b.Having, + Legend: b.Legend, + Limit: b.Limit, + Offset: b.Offset, + PageSize: b.PageSize, + OrderBy: b.OrderBy, + ReduceTo: b.ReduceTo, + SelectColumns: b.SelectColumns, + TimeAggregation: b.TimeAggregation, + SpaceAggregation: b.SpaceAggregation, + Functions: b.Functions, + ShiftBy: b.ShiftBy, + IsAnomaly: b.IsAnomaly, + QueriesUsedInFormula: b.QueriesUsedInFormula, + } } // CanDefaultZero returns true if the missing value can be substituted by zero @@ -877,6 +989,16 @@ type FilterSet struct { Items []FilterItem `json:"items"` } +func (f *FilterSet) Clone() *FilterSet { + if f == nil { + return nil + } + return &FilterSet{ + Operator: f.Operator, + Items: f.Items, + } +} + func (f *FilterSet) Validate() error { if f == nil { return nil @@ -1028,12 +1150,15 @@ type Table struct { } type Result struct { - QueryName string `json:"queryName,omitempty"` - Series []*Series `json:"series,omitempty"` - PredictedSeries []*Series `json:"predictedSeries,omitempty"` - AnomalyScores []*Series `json:"anomalyScores,omitempty"` - List []*Row `json:"list,omitempty"` - Table *Table `json:"table,omitempty"` + QueryName string `json:"queryName,omitempty"` + Series []*Series `json:"series,omitempty"` + PredictedSeries []*Series `json:"predictedSeries,omitempty"` + UpperBoundSeries []*Series `json:"upperBoundSeries,omitempty"` + LowerBoundSeries []*Series `json:"lowerBoundSeries,omitempty"` + AnomalyScores []*Series `json:"anomalyScores,omitempty"` + List []*Row `json:"list,omitempty"` + Table *Table `json:"table,omitempty"` + IsAnomaly bool `json:"isAnomaly,omitempty"` } type Series struct {