chore: use mean of past, past2 and past3 seasons for growth (#5974)

This commit is contained in:
Srikanth Chekuri 2024-09-17 16:12:17 +05:30 committed by GitHub
parent 8c891f0e87
commit 06a89b21da
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 549 additions and 97 deletions

View File

@ -2,6 +2,9 @@ package anomaly
import (
"context"
querierV2 "go.signoz.io/signoz/pkg/query-service/app/querier/v2"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
)
type DailyProvider struct {
@ -24,9 +27,18 @@ func NewDailyProvider(opts ...GenericProviderOption[*DailyProvider]) *DailyProvi
opt(dp)
}
dp.querierV2 = querierV2.NewQuerier(querierV2.QuerierOptions{
Reader: dp.reader,
Cache: dp.cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: dp.fluxInterval,
FeatureLookup: dp.ff,
})
return dp
}
func (p *DailyProvider) GetAnomalies(ctx context.Context, req *GetAnomaliesRequest) (*GetAnomaliesResponse, error) {
return nil, nil
req.Seasonality = SeasonalityDaily
return p.getAnomalies(ctx, req)
}

View File

@ -2,6 +2,9 @@ package anomaly
import (
"context"
querierV2 "go.signoz.io/signoz/pkg/query-service/app/querier/v2"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
)
type HourlyProvider struct {
@ -24,9 +27,18 @@ func NewHourlyProvider(opts ...GenericProviderOption[*HourlyProvider]) *HourlyPr
opt(hp)
}
hp.querierV2 = querierV2.NewQuerier(querierV2.QuerierOptions{
Reader: hp.reader,
Cache: hp.cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: hp.fluxInterval,
FeatureLookup: hp.ff,
})
return hp
}
func (p *HourlyProvider) GetAnomalies(ctx context.Context, req *GetAnomaliesRequest) (*GetAnomaliesResponse, error) {
return nil, nil
req.Seasonality = SeasonalityHourly
return p.getAnomalies(ctx, req)
}

View File

@ -16,6 +16,13 @@ const (
SeasonalityWeekly Seasonality = "weekly"
)
var (
oneWeekOffset = 24 * 7 * time.Hour.Milliseconds()
oneDayOffset = 24 * time.Hour.Milliseconds()
oneHourOffset = time.Hour.Milliseconds()
fiveMinOffset = 5 * time.Minute.Milliseconds()
)
func (s Seasonality) IsValid() bool {
switch s {
case SeasonalityHourly, SeasonalityDaily, SeasonalityWeekly:
@ -35,7 +42,7 @@ type GetAnomaliesResponse struct {
}
// anomalyParams is the params for anomaly detection
// prediction = avg(past_period_query) + avg(current_season_query) - avg(past_season_query)
// prediction = avg(past_period_query) + avg(current_season_query) - mean(past_season_query, past2_season_query, past3_season_query)
//
// ^ ^
// | |
@ -49,9 +56,9 @@ type anomalyQueryParams struct {
// and to detect anomalies
CurrentPeriodQuery *v3.QueryRangeParamsV3
// PastPeriodQuery is the query range params for past seasonal period
// Example: For weekly seasonality, (now-1w-4h-5m, now-1w)
// : For daily seasonality, (now-1d-2h-5m, now-1d)
// : For hourly seasonality, (now-1h-30m-5m, now-1h)
// Example: For weekly seasonality, (now-1w-5m, now-1w)
// : For daily seasonality, (now-1d-5m, now-1d)
// : For hourly seasonality, (now-1h-5m, now-1h)
PastPeriodQuery *v3.QueryRangeParamsV3
// CurrentSeasonQuery is the query range params for current period (seasonal)
// Example: For weekly seasonality, this is the query range params for the (now-1w-5m, now)
@ -63,16 +70,17 @@ type anomalyQueryParams struct {
// : For daily seasonality, this is the query range params for the (now-2d-5m, now-1d)
// : For hourly seasonality, this is the query range params for the (now-2h-5m, now-1h)
PastSeasonQuery *v3.QueryRangeParamsV3
}
func copyCompositeQuery(req *v3.QueryRangeParamsV3) *v3.CompositeQuery {
deepCopyCompositeQuery := *req.CompositeQuery
deepCopyCompositeQuery.BuilderQueries = make(map[string]*v3.BuilderQuery)
for k, v := range req.CompositeQuery.BuilderQueries {
query := *v
deepCopyCompositeQuery.BuilderQueries[k] = &query
}
return &deepCopyCompositeQuery
// Past2SeasonQuery is the query range params for past 2 seasonal period to the current season
// Example: For weekly seasonality, this is the query range params for the (now-3w-5m, now-2w)
// : For daily seasonality, this is the query range params for the (now-3d-5m, now-2d)
// : For hourly seasonality, this is the query range params for the (now-3h-5m, now-2h)
Past2SeasonQuery *v3.QueryRangeParamsV3
// Past3SeasonQuery is the query range params for past 3 seasonal period to the current season
// Example: For weekly seasonality, this is the query range params for the (now-4w-5m, now-3w)
// : For daily seasonality, this is the query range params for the (now-4d-5m, now-3d)
// : For hourly seasonality, this is the query range params for the (now-4h-5m, now-3h)
Past3SeasonQuery *v3.QueryRangeParamsV3
}
func updateStepInterval(req *v3.QueryRangeParamsV3) {
@ -95,7 +103,7 @@ func prepareAnomalyQueryParams(req *v3.QueryRangeParamsV3, seasonality Seasonali
currentPeriodQuery := &v3.QueryRangeParamsV3{
Start: start,
End: end,
CompositeQuery: req.CompositeQuery,
CompositeQuery: req.CompositeQuery.Clone(),
Variables: make(map[string]interface{}, 0),
NoCache: false,
}
@ -104,24 +112,24 @@ func prepareAnomalyQueryParams(req *v3.QueryRangeParamsV3, seasonality Seasonali
var pastPeriodStart, pastPeriodEnd int64
switch seasonality {
// for one week period, we fetch the data from the past week with 4 hours offset
// for one week period, we fetch the data from the past week with 5 min offset
case SeasonalityWeekly:
pastPeriodStart = start - 166*time.Hour.Milliseconds() - 4*time.Hour.Milliseconds()
pastPeriodEnd = end - 166*time.Hour.Milliseconds()
// for one day period, we fetch the data from the past day with 2 hours offset
pastPeriodStart = start - oneWeekOffset - fiveMinOffset
pastPeriodEnd = end - oneWeekOffset
// for one day period, we fetch the data from the past day with 5 min offset
case SeasonalityDaily:
pastPeriodStart = start - 23*time.Hour.Milliseconds() - 2*time.Hour.Milliseconds()
pastPeriodEnd = end - 23*time.Hour.Milliseconds()
// for one hour period, we fetch the data from the past hour with 30 minutes offset
pastPeriodStart = start - oneDayOffset - fiveMinOffset
pastPeriodEnd = end - oneDayOffset
// for one hour period, we fetch the data from the past hour with 5 min offset
case SeasonalityHourly:
pastPeriodStart = start - 1*time.Hour.Milliseconds() - 30*time.Minute.Milliseconds()
pastPeriodEnd = end - 1*time.Hour.Milliseconds()
pastPeriodStart = start - oneHourOffset - fiveMinOffset
pastPeriodEnd = end - oneHourOffset
}
pastPeriodQuery := &v3.QueryRangeParamsV3{
Start: pastPeriodStart,
End: pastPeriodEnd,
CompositeQuery: copyCompositeQuery(req),
CompositeQuery: req.CompositeQuery.Clone(),
Variables: make(map[string]interface{}, 0),
NoCache: false,
}
@ -131,20 +139,20 @@ func prepareAnomalyQueryParams(req *v3.QueryRangeParamsV3, seasonality Seasonali
var currentGrowthPeriodStart, currentGrowthPeriodEnd int64
switch seasonality {
case SeasonalityWeekly:
currentGrowthPeriodStart = start - 7*24*time.Hour.Milliseconds()
currentGrowthPeriodStart = start - oneWeekOffset
currentGrowthPeriodEnd = end
case SeasonalityDaily:
currentGrowthPeriodStart = start - 23*time.Hour.Milliseconds()
currentGrowthPeriodStart = start - oneDayOffset
currentGrowthPeriodEnd = end
case SeasonalityHourly:
currentGrowthPeriodStart = start - 1*time.Hour.Milliseconds()
currentGrowthPeriodStart = start - oneHourOffset
currentGrowthPeriodEnd = end
}
currentGrowthQuery := &v3.QueryRangeParamsV3{
Start: currentGrowthPeriodStart,
End: currentGrowthPeriodEnd,
CompositeQuery: copyCompositeQuery(req),
CompositeQuery: req.CompositeQuery.Clone(),
Variables: make(map[string]interface{}, 0),
NoCache: false,
}
@ -153,30 +161,76 @@ func prepareAnomalyQueryParams(req *v3.QueryRangeParamsV3, seasonality Seasonali
var pastGrowthPeriodStart, pastGrowthPeriodEnd int64
switch seasonality {
case SeasonalityWeekly:
pastGrowthPeriodStart = start - 14*24*time.Hour.Milliseconds()
pastGrowthPeriodEnd = start - 7*24*time.Hour.Milliseconds()
pastGrowthPeriodStart = start - 2*oneWeekOffset
pastGrowthPeriodEnd = start - 1*oneWeekOffset
case SeasonalityDaily:
pastGrowthPeriodStart = start - 2*time.Hour.Milliseconds()
pastGrowthPeriodEnd = start - 1*time.Hour.Milliseconds()
pastGrowthPeriodStart = start - 2*oneDayOffset
pastGrowthPeriodEnd = start - 1*oneDayOffset
case SeasonalityHourly:
pastGrowthPeriodStart = start - 2*time.Hour.Milliseconds()
pastGrowthPeriodEnd = start - 1*time.Hour.Milliseconds()
pastGrowthPeriodStart = start - 2*oneHourOffset
pastGrowthPeriodEnd = start - 1*oneHourOffset
}
pastGrowthQuery := &v3.QueryRangeParamsV3{
Start: pastGrowthPeriodStart,
End: pastGrowthPeriodEnd,
CompositeQuery: copyCompositeQuery(req),
CompositeQuery: req.CompositeQuery.Clone(),
Variables: make(map[string]interface{}, 0),
NoCache: false,
}
updateStepInterval(pastGrowthQuery)
var past2GrowthPeriodStart, past2GrowthPeriodEnd int64
switch seasonality {
case SeasonalityWeekly:
past2GrowthPeriodStart = start - 3*oneWeekOffset
past2GrowthPeriodEnd = start - 2*oneWeekOffset
case SeasonalityDaily:
past2GrowthPeriodStart = start - 3*oneDayOffset
past2GrowthPeriodEnd = start - 2*oneDayOffset
case SeasonalityHourly:
past2GrowthPeriodStart = start - 3*oneHourOffset
past2GrowthPeriodEnd = start - 2*oneHourOffset
}
past2GrowthQuery := &v3.QueryRangeParamsV3{
Start: past2GrowthPeriodStart,
End: past2GrowthPeriodEnd,
CompositeQuery: req.CompositeQuery.Clone(),
Variables: make(map[string]interface{}, 0),
NoCache: false,
}
updateStepInterval(past2GrowthQuery)
var past3GrowthPeriodStart, past3GrowthPeriodEnd int64
switch seasonality {
case SeasonalityWeekly:
past3GrowthPeriodStart = start - 4*oneWeekOffset
past3GrowthPeriodEnd = start - 3*oneWeekOffset
case SeasonalityDaily:
past3GrowthPeriodStart = start - 4*oneDayOffset
past3GrowthPeriodEnd = start - 3*oneDayOffset
case SeasonalityHourly:
past3GrowthPeriodStart = start - 4*oneHourOffset
past3GrowthPeriodEnd = start - 3*oneHourOffset
}
past3GrowthQuery := &v3.QueryRangeParamsV3{
Start: past3GrowthPeriodStart,
End: past3GrowthPeriodEnd,
CompositeQuery: req.CompositeQuery.Clone(),
Variables: make(map[string]interface{}, 0),
NoCache: false,
}
updateStepInterval(past3GrowthQuery)
return &anomalyQueryParams{
CurrentPeriodQuery: currentPeriodQuery,
PastPeriodQuery: pastPeriodQuery,
CurrentSeasonQuery: currentGrowthQuery,
PastSeasonQuery: pastGrowthQuery,
Past2SeasonQuery: past2GrowthQuery,
Past3SeasonQuery: past3GrowthQuery,
}
}
@ -185,4 +239,6 @@ type anomalyQueryResults struct {
PastPeriodResults []*v3.Result
CurrentSeasonResults []*v3.Result
PastSeasonResults []*v3.Result
Past2SeasonResults []*v3.Result
Past3SeasonResults []*v3.Result
}

View File

@ -3,14 +3,21 @@ package anomaly
import (
"context"
"math"
"time"
"go.signoz.io/signoz/pkg/query-service/cache"
"go.signoz.io/signoz/pkg/query-service/interfaces"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/postprocess"
"go.signoz.io/signoz/pkg/query-service/utils/labels"
"go.uber.org/zap"
)
var (
// TODO(srikanthccv): make this configurable?
movingAvgWindowSize = 7
)
// BaseProvider is an interface that includes common methods for all provider types
type BaseProvider interface {
GetBaseSeasonalProvider() *BaseSeasonalProvider
@ -46,6 +53,7 @@ func WithReader[T BaseProvider](reader interfaces.Reader) GenericProviderOption[
type BaseSeasonalProvider struct {
querierV2 interfaces.Querier
reader interfaces.Reader
fluxInterval time.Duration
cache cache.Cache
keyGenerator cache.KeyGenerator
ff interfaces.FeatureLookup
@ -53,28 +61,68 @@ type BaseSeasonalProvider struct {
func (p *BaseSeasonalProvider) getQueryParams(req *GetAnomaliesRequest) *anomalyQueryParams {
if !req.Seasonality.IsValid() {
req.Seasonality = SeasonalityWeekly
req.Seasonality = SeasonalityDaily
}
return prepareAnomalyQueryParams(req.Params, req.Seasonality)
}
func (p *BaseSeasonalProvider) getResults(ctx context.Context, params *anomalyQueryParams) (*anomalyQueryResults, error) {
currentPeriodResults, _, err := p.querierV2.QueryRange(ctx, params.CurrentPeriodQuery, nil)
currentPeriodResults, _, err := p.querierV2.QueryRange(ctx, params.CurrentPeriodQuery)
if err != nil {
return nil, err
}
pastPeriodResults, _, err := p.querierV2.QueryRange(ctx, params.PastPeriodQuery, nil)
currentPeriodResults, err = postprocess.PostProcessResult(currentPeriodResults, params.CurrentPeriodQuery)
if err != nil {
return nil, err
}
currentSeasonResults, _, err := p.querierV2.QueryRange(ctx, params.CurrentSeasonQuery, nil)
pastPeriodResults, _, err := p.querierV2.QueryRange(ctx, params.PastPeriodQuery)
if err != nil {
return nil, err
}
pastSeasonResults, _, err := p.querierV2.QueryRange(ctx, params.PastSeasonQuery, nil)
pastPeriodResults, err = postprocess.PostProcessResult(pastPeriodResults, params.PastPeriodQuery)
if err != nil {
return nil, err
}
currentSeasonResults, _, err := p.querierV2.QueryRange(ctx, params.CurrentSeasonQuery)
if err != nil {
return nil, err
}
currentSeasonResults, err = postprocess.PostProcessResult(currentSeasonResults, params.CurrentSeasonQuery)
if err != nil {
return nil, err
}
pastSeasonResults, _, err := p.querierV2.QueryRange(ctx, params.PastSeasonQuery)
if err != nil {
return nil, err
}
pastSeasonResults, err = postprocess.PostProcessResult(pastSeasonResults, params.PastSeasonQuery)
if err != nil {
return nil, err
}
past2SeasonResults, _, err := p.querierV2.QueryRange(ctx, params.Past2SeasonQuery)
if err != nil {
return nil, err
}
past2SeasonResults, err = postprocess.PostProcessResult(past2SeasonResults, params.Past2SeasonQuery)
if err != nil {
return nil, err
}
past3SeasonResults, _, err := p.querierV2.QueryRange(ctx, params.Past3SeasonQuery)
if err != nil {
return nil, err
}
past3SeasonResults, err = postprocess.PostProcessResult(past3SeasonResults, params.Past3SeasonQuery)
if err != nil {
return nil, err
}
@ -84,10 +132,18 @@ func (p *BaseSeasonalProvider) getResults(ctx context.Context, params *anomalyQu
PastPeriodResults: pastPeriodResults,
CurrentSeasonResults: currentSeasonResults,
PastSeasonResults: pastSeasonResults,
Past2SeasonResults: past2SeasonResults,
Past3SeasonResults: past3SeasonResults,
}, nil
}
// getMatchingSeries gets the matching series from the query result
// for the given series
func (p *BaseSeasonalProvider) getMatchingSeries(queryResult *v3.Result, series *v3.Series) *v3.Series {
if queryResult == nil || len(queryResult.Series) == 0 {
return nil
}
for _, curr := range queryResult.Series {
currLabels := labels.FromMap(curr.Labels)
seriesLabels := labels.FromMap(series.Labels)
@ -99,6 +155,9 @@ func (p *BaseSeasonalProvider) getMatchingSeries(queryResult *v3.Result, series
}
func (p *BaseSeasonalProvider) getAvg(series *v3.Series) float64 {
if series == nil || len(series.Points) == 0 {
return 0
}
var sum float64
for _, smpl := range series.Points {
sum += smpl.Value
@ -107,6 +166,9 @@ func (p *BaseSeasonalProvider) getAvg(series *v3.Series) float64 {
}
func (p *BaseSeasonalProvider) getStdDev(series *v3.Series) float64 {
if series == nil || len(series.Points) == 0 {
return 0
}
avg := p.getAvg(series)
var sum float64
for _, smpl := range series.Points {
@ -115,15 +177,65 @@ func (p *BaseSeasonalProvider) getStdDev(series *v3.Series) float64 {
return math.Sqrt(sum / float64(len(series.Points)))
}
func (p *BaseSeasonalProvider) getPredictedSeries(series, prevSeries, currentSeasonSeries, pastSeasonSeries *v3.Series) *v3.Series {
// getMovingAvg gets the moving average for the given series
// for the given window size and start index
func (p *BaseSeasonalProvider) getMovingAvg(series *v3.Series, movingAvgWindowSize, startIdx int) float64 {
if series == nil || len(series.Points) == 0 {
return 0
}
if startIdx >= len(series.Points)-movingAvgWindowSize {
startIdx = len(series.Points) - movingAvgWindowSize
}
var sum float64
points := series.Points[startIdx:]
for i := 0; i < movingAvgWindowSize && i < len(points); i++ {
sum += points[i].Value
}
avg := sum / float64(movingAvgWindowSize)
return avg
}
func (p *BaseSeasonalProvider) getMean(floats ...float64) float64 {
if len(floats) == 0 {
return 0
}
var sum float64
for _, f := range floats {
sum += f
}
return sum / float64(len(floats))
}
func (p *BaseSeasonalProvider) getPredictedSeries(
series, prevSeries, currentSeasonSeries, pastSeasonSeries, past2SeasonSeries, past3SeasonSeries *v3.Series,
) *v3.Series {
predictedSeries := &v3.Series{
Labels: series.Labels,
LabelsArray: series.LabelsArray,
Points: []v3.Point{},
}
for _, curr := range series.Points {
predictedValue := p.getAvg(prevSeries) + p.getAvg(currentSeasonSeries) - p.getAvg(pastSeasonSeries)
// for each point in the series, get the predicted value
// the predicted value is the moving average (with window size = 7) of the previous period series
// plus the average of the current season series
// minus the mean of the past season series, past2 season series and past3 season series
for idx, curr := range series.Points {
predictedValue :=
p.getMovingAvg(prevSeries, movingAvgWindowSize, idx) +
p.getAvg(currentSeasonSeries) -
p.getMean(p.getAvg(pastSeasonSeries), p.getAvg(past2SeasonSeries), p.getAvg(past3SeasonSeries))
if predictedValue < 0 {
predictedValue = p.getMovingAvg(prevSeries, movingAvgWindowSize, idx)
}
zap.L().Info("predictedSeries",
zap.Float64("movingAvg", p.getMovingAvg(prevSeries, movingAvgWindowSize, idx)),
zap.Float64("avg", p.getAvg(currentSeasonSeries)),
zap.Float64("mean", p.getMean(p.getAvg(pastSeasonSeries), p.getAvg(past2SeasonSeries), p.getAvg(past3SeasonSeries))),
zap.Any("labels", series.Labels),
zap.Float64("predictedValue", predictedValue),
)
predictedSeries.Points = append(predictedSeries.Points, v3.Point{
Timestamp: curr.Timestamp,
Value: predictedValue,
@ -133,33 +245,80 @@ func (p *BaseSeasonalProvider) getPredictedSeries(series, prevSeries, currentSea
return predictedSeries
}
func (p *BaseSeasonalProvider) getExpectedValue(_, prevSeries, currentSeasonSeries, pastSeasonSeries *v3.Series) float64 {
prevSeriesAvg := p.getAvg(prevSeries)
currentSeasonSeriesAvg := p.getAvg(currentSeasonSeries)
pastSeasonSeriesAvg := p.getAvg(pastSeasonSeries)
zap.L().Debug("getExpectedValue",
zap.Float64("prevSeriesAvg", prevSeriesAvg),
zap.Float64("currentSeasonSeriesAvg", currentSeasonSeriesAvg),
zap.Float64("pastSeasonSeriesAvg", pastSeasonSeriesAvg),
zap.Float64("expectedValue", prevSeriesAvg+currentSeasonSeriesAvg-pastSeasonSeriesAvg),
)
return prevSeriesAvg + currentSeasonSeriesAvg - pastSeasonSeriesAvg
// getBounds gets the upper and lower bounds for the given series
// for the given z score threshold
// moving avg of the previous period series + z score threshold * std dev of the series
// moving avg of the previous period series - z score threshold * std dev of the series
func (p *BaseSeasonalProvider) getBounds(
series, prevSeries, _, _, _, _ *v3.Series,
zScoreThreshold float64,
) (*v3.Series, *v3.Series) {
upperBoundSeries := &v3.Series{
Labels: series.Labels,
LabelsArray: series.LabelsArray,
Points: []v3.Point{},
}
lowerBoundSeries := &v3.Series{
Labels: series.Labels,
LabelsArray: series.LabelsArray,
Points: []v3.Point{},
}
for idx, curr := range series.Points {
upperBound := p.getMovingAvg(prevSeries, movingAvgWindowSize, idx) + zScoreThreshold*p.getStdDev(series)
lowerBound := p.getMovingAvg(prevSeries, movingAvgWindowSize, idx) - zScoreThreshold*p.getStdDev(series)
upperBoundSeries.Points = append(upperBoundSeries.Points, v3.Point{
Timestamp: curr.Timestamp,
Value: upperBound,
})
lowerBoundSeries.Points = append(lowerBoundSeries.Points, v3.Point{
Timestamp: curr.Timestamp,
Value: math.Max(lowerBound, 0),
})
}
return upperBoundSeries, lowerBoundSeries
}
func (p *BaseSeasonalProvider) getScore(series, prevSeries, weekSeries, weekPrevSeries *v3.Series, value float64) float64 {
expectedValue := p.getExpectedValue(series, prevSeries, weekSeries, weekPrevSeries)
// getExpectedValue gets the expected value for the given series
// for the given index
// prevSeriesAvg + currentSeasonSeriesAvg - mean of past season series, past2 season series and past3 season series
func (p *BaseSeasonalProvider) getExpectedValue(
_, prevSeries, currentSeasonSeries, pastSeasonSeries, past2SeasonSeries, past3SeasonSeries *v3.Series, idx int,
) float64 {
prevSeriesAvg := p.getMovingAvg(prevSeries, movingAvgWindowSize, idx)
currentSeasonSeriesAvg := p.getAvg(currentSeasonSeries)
pastSeasonSeriesAvg := p.getAvg(pastSeasonSeries)
past2SeasonSeriesAvg := p.getAvg(past2SeasonSeries)
past3SeasonSeriesAvg := p.getAvg(past3SeasonSeries)
return prevSeriesAvg + currentSeasonSeriesAvg - p.getMean(pastSeasonSeriesAvg, past2SeasonSeriesAvg, past3SeasonSeriesAvg)
}
// getScore gets the anomaly score for the given series
// for the given index
// (value - expectedValue) / std dev of the series
func (p *BaseSeasonalProvider) getScore(
series, prevSeries, weekSeries, weekPrevSeries, past2SeasonSeries, past3SeasonSeries *v3.Series, value float64, idx int,
) float64 {
expectedValue := p.getExpectedValue(series, prevSeries, weekSeries, weekPrevSeries, past2SeasonSeries, past3SeasonSeries, idx)
return (value - expectedValue) / p.getStdDev(weekSeries)
}
func (p *BaseSeasonalProvider) getAnomalyScores(series, prevSeries, currentSeasonSeries, pastSeasonSeries *v3.Series) *v3.Series {
// getAnomalyScores gets the anomaly scores for the given series
// for the given index
// (value - expectedValue) / std dev of the series
func (p *BaseSeasonalProvider) getAnomalyScores(
series, prevSeries, currentSeasonSeries, pastSeasonSeries, past2SeasonSeries, past3SeasonSeries *v3.Series,
) *v3.Series {
anomalyScoreSeries := &v3.Series{
Labels: series.Labels,
LabelsArray: series.LabelsArray,
Points: []v3.Point{},
}
for _, curr := range series.Points {
anomalyScore := p.getScore(series, prevSeries, currentSeasonSeries, pastSeasonSeries, curr.Value)
for idx, curr := range series.Points {
anomalyScore := p.getScore(series, prevSeries, currentSeasonSeries, pastSeasonSeries, past2SeasonSeries, past3SeasonSeries, curr.Value, idx)
anomalyScoreSeries.Points = append(anomalyScoreSeries.Points, v3.Point{
Timestamp: curr.Timestamp,
Value: anomalyScore,
@ -169,7 +328,7 @@ func (p *BaseSeasonalProvider) getAnomalyScores(series, prevSeries, currentSeaso
return anomalyScoreSeries
}
func (p *BaseSeasonalProvider) GetAnomalies(ctx context.Context, req *GetAnomaliesRequest) (*GetAnomaliesResponse, error) {
func (p *BaseSeasonalProvider) getAnomalies(ctx context.Context, req *GetAnomaliesRequest) (*GetAnomaliesResponse, error) {
anomalyParams := p.getQueryParams(req)
anomalyQueryResults, err := p.getResults(ctx, anomalyParams)
if err != nil {
@ -196,7 +355,32 @@ func (p *BaseSeasonalProvider) GetAnomalies(ctx context.Context, req *GetAnomali
pastSeasonResultsMap[result.QueryName] = result
}
past2SeasonResultsMap := make(map[string]*v3.Result)
for _, result := range anomalyQueryResults.Past2SeasonResults {
past2SeasonResultsMap[result.QueryName] = result
}
past3SeasonResultsMap := make(map[string]*v3.Result)
for _, result := range anomalyQueryResults.Past3SeasonResults {
past3SeasonResultsMap[result.QueryName] = result
}
for _, result := range currentPeriodResultsMap {
funcs := req.Params.CompositeQuery.BuilderQueries[result.QueryName].Functions
var zScoreThreshold float64
for _, f := range funcs {
if f.Name == v3.FunctionNameAnomaly {
value, ok := f.NamedArgs["z_score_threshold"]
if ok {
zScoreThreshold = value.(float64)
} else {
zScoreThreshold = 3
}
break
}
}
pastPeriodResult, ok := pastPeriodResultsMap[result.QueryName]
if !ok {
continue
@ -209,21 +393,72 @@ func (p *BaseSeasonalProvider) GetAnomalies(ctx context.Context, req *GetAnomali
if !ok {
continue
}
past2SeasonResult, ok := past2SeasonResultsMap[result.QueryName]
if !ok {
continue
}
past3SeasonResult, ok := past3SeasonResultsMap[result.QueryName]
if !ok {
continue
}
for _, series := range result.Series {
stdDev := p.getStdDev(series)
zap.L().Info("stdDev", zap.Float64("stdDev", stdDev), zap.Any("labels", series.Labels))
pastPeriodSeries := p.getMatchingSeries(pastPeriodResult, series)
currentSeasonSeries := p.getMatchingSeries(currentSeasonResult, series)
pastSeasonSeries := p.getMatchingSeries(pastSeasonResult, series)
past2SeasonSeries := p.getMatchingSeries(past2SeasonResult, series)
past3SeasonSeries := p.getMatchingSeries(past3SeasonResult, series)
predictedSeries := p.getPredictedSeries(series, pastPeriodSeries, currentSeasonSeries, pastSeasonSeries)
prevSeriesAvg := p.getAvg(pastPeriodSeries)
currentSeasonSeriesAvg := p.getAvg(currentSeasonSeries)
pastSeasonSeriesAvg := p.getAvg(pastSeasonSeries)
past2SeasonSeriesAvg := p.getAvg(past2SeasonSeries)
past3SeasonSeriesAvg := p.getAvg(past3SeasonSeries)
zap.L().Info("getAvg", zap.Float64("prevSeriesAvg", prevSeriesAvg), zap.Float64("currentSeasonSeriesAvg", currentSeasonSeriesAvg), zap.Float64("pastSeasonSeriesAvg", pastSeasonSeriesAvg), zap.Float64("past2SeasonSeriesAvg", past2SeasonSeriesAvg), zap.Float64("past3SeasonSeriesAvg", past3SeasonSeriesAvg), zap.Any("labels", series.Labels))
predictedSeries := p.getPredictedSeries(
series,
pastPeriodSeries,
currentSeasonSeries,
pastSeasonSeries,
past2SeasonSeries,
past3SeasonSeries,
)
result.PredictedSeries = append(result.PredictedSeries, predictedSeries)
anomalyScoreSeries := p.getAnomalyScores(series, pastPeriodSeries, currentSeasonSeries, pastSeasonSeries)
upperBoundSeries, lowerBoundSeries := p.getBounds(
series,
pastPeriodSeries,
currentSeasonSeries,
pastSeasonSeries,
past2SeasonSeries,
past3SeasonSeries,
zScoreThreshold,
)
result.UpperBoundSeries = append(result.UpperBoundSeries, upperBoundSeries)
result.LowerBoundSeries = append(result.LowerBoundSeries, lowerBoundSeries)
anomalyScoreSeries := p.getAnomalyScores(
series,
pastPeriodSeries,
currentSeasonSeries,
pastSeasonSeries,
past2SeasonSeries,
past3SeasonSeries,
)
result.AnomalyScores = append(result.AnomalyScores, anomalyScoreSeries)
}
}
results := make([]*v3.Result, 0, len(currentPeriodResultsMap))
for _, result := range currentPeriodResultsMap {
results = append(results, result)
}
return &GetAnomaliesResponse{
Results: anomalyQueryResults.CurrentPeriodResults,
Results: results,
}, nil
}

View File

@ -2,6 +2,9 @@ package anomaly
import (
"context"
querierV2 "go.signoz.io/signoz/pkg/query-service/app/querier/v2"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
)
type WeeklyProvider struct {
@ -23,9 +26,18 @@ func NewWeeklyProvider(opts ...GenericProviderOption[*WeeklyProvider]) *WeeklyPr
opt(wp)
}
wp.querierV2 = querierV2.NewQuerier(querierV2.QuerierOptions{
Reader: wp.reader,
Cache: wp.cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: wp.fluxInterval,
FeatureLookup: wp.ff,
})
return wp
}
func (p *WeeklyProvider) GetAnomalies(ctx context.Context, req *GetAnomaliesRequest) (*GetAnomaliesResponse, error) {
return nil, nil
req.Seasonality = SeasonalityWeekly
return p.getAnomalies(ctx, req)
}

View File

@ -370,6 +370,22 @@ type QueryRangeParamsV3 struct {
FormatForWeb bool `json:"formatForWeb,omitempty"`
}
func (q *QueryRangeParamsV3) Clone() *QueryRangeParamsV3 {
if q == nil {
return nil
}
return &QueryRangeParamsV3{
Start: q.Start,
End: q.End,
Step: q.Step,
CompositeQuery: q.CompositeQuery.Clone(),
Variables: q.Variables,
NoCache: q.NoCache,
Version: q.Version,
FormatForWeb: q.FormatForWeb,
}
}
type PromQuery struct {
Query string `json:"query"`
Stats string `json:"stats,omitempty"`
@ -377,6 +393,18 @@ type PromQuery struct {
Legend string `json:"legend,omitempty"`
}
func (p *PromQuery) Clone() *PromQuery {
if p == nil {
return nil
}
return &PromQuery{
Query: p.Query,
Stats: p.Stats,
Disabled: p.Disabled,
Legend: p.Legend,
}
}
func (p *PromQuery) Validate() error {
if p == nil {
return nil
@ -395,6 +423,16 @@ type ClickHouseQuery struct {
Legend string `json:"legend,omitempty"`
}
func (c *ClickHouseQuery) Clone() *ClickHouseQuery {
if c == nil {
return nil
}
return &ClickHouseQuery{
Query: c.Query,
Disabled: c.Disabled,
Legend: c.Legend,
}
}
func (c *ClickHouseQuery) Validate() error {
if c == nil {
return nil
@ -420,6 +458,43 @@ type CompositeQuery struct {
FillGaps bool `json:"fillGaps,omitempty"`
}
func (c *CompositeQuery) Clone() *CompositeQuery {
if c == nil {
return nil
}
var builderQueries map[string]*BuilderQuery
if c.BuilderQueries != nil {
builderQueries = make(map[string]*BuilderQuery)
for name, query := range c.BuilderQueries {
builderQueries[name] = query.Clone()
}
}
var clickHouseQueries map[string]*ClickHouseQuery
if c.ClickHouseQueries != nil {
clickHouseQueries = make(map[string]*ClickHouseQuery)
for name, query := range c.ClickHouseQueries {
clickHouseQueries[name] = query.Clone()
}
}
var promQueries map[string]*PromQuery
if c.PromQueries != nil {
promQueries = make(map[string]*PromQuery)
for name, query := range c.PromQueries {
promQueries[name] = query.Clone()
}
}
return &CompositeQuery{
BuilderQueries: builderQueries,
ClickHouseQueries: clickHouseQueries,
PromQueries: promQueries,
PanelType: c.PanelType,
QueryType: c.QueryType,
Unit: c.Unit,
FillGaps: c.FillGaps,
}
}
func (c *CompositeQuery) EnabledQueries() int {
count := 0
switch c.QueryType {
@ -645,6 +720,7 @@ const (
FunctionNameMedian5 FunctionName = "median5"
FunctionNameMedian7 FunctionName = "median7"
FunctionNameTimeShift FunctionName = "timeShift"
FunctionNameAnomaly FunctionName = "anomaly"
)
func (f FunctionName) Validate() error {
@ -664,7 +740,8 @@ func (f FunctionName) Validate() error {
FunctionNameMedian3,
FunctionNameMedian5,
FunctionNameMedian7,
FunctionNameTimeShift:
FunctionNameTimeShift,
FunctionNameAnomaly:
return nil
default:
return fmt.Errorf("invalid function name: %s", f)
@ -672,33 +749,68 @@ func (f FunctionName) Validate() error {
}
type Function struct {
Name FunctionName `json:"name"`
Args []interface{} `json:"args,omitempty"`
Name FunctionName `json:"name"`
Args []interface{} `json:"args,omitempty"`
NamedArgs map[string]interface{} `json:"namedArgs,omitempty"`
}
type BuilderQuery struct {
QueryName string `json:"queryName"`
StepInterval int64 `json:"stepInterval"`
DataSource DataSource `json:"dataSource"`
AggregateOperator AggregateOperator `json:"aggregateOperator"`
AggregateAttribute AttributeKey `json:"aggregateAttribute,omitempty"`
Temporality Temporality `json:"temporality,omitempty"`
Filters *FilterSet `json:"filters,omitempty"`
GroupBy []AttributeKey `json:"groupBy,omitempty"`
Expression string `json:"expression"`
Disabled bool `json:"disabled"`
Having []Having `json:"having,omitempty"`
Legend string `json:"legend,omitempty"`
Limit uint64 `json:"limit"`
Offset uint64 `json:"offset"`
PageSize uint64 `json:"pageSize"`
OrderBy []OrderBy `json:"orderBy,omitempty"`
ReduceTo ReduceToOperator `json:"reduceTo,omitempty"`
SelectColumns []AttributeKey `json:"selectColumns,omitempty"`
TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"`
SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"`
Functions []Function `json:"functions,omitempty"`
ShiftBy int64
QueryName string `json:"queryName"`
StepInterval int64 `json:"stepInterval"`
DataSource DataSource `json:"dataSource"`
AggregateOperator AggregateOperator `json:"aggregateOperator"`
AggregateAttribute AttributeKey `json:"aggregateAttribute,omitempty"`
Temporality Temporality `json:"temporality,omitempty"`
Filters *FilterSet `json:"filters,omitempty"`
GroupBy []AttributeKey `json:"groupBy,omitempty"`
Expression string `json:"expression"`
Disabled bool `json:"disabled"`
Having []Having `json:"having,omitempty"`
Legend string `json:"legend,omitempty"`
Limit uint64 `json:"limit"`
Offset uint64 `json:"offset"`
PageSize uint64 `json:"pageSize"`
OrderBy []OrderBy `json:"orderBy,omitempty"`
ReduceTo ReduceToOperator `json:"reduceTo,omitempty"`
SelectColumns []AttributeKey `json:"selectColumns,omitempty"`
TimeAggregation TimeAggregation `json:"timeAggregation,omitempty"`
SpaceAggregation SpaceAggregation `json:"spaceAggregation,omitempty"`
Functions []Function `json:"functions,omitempty"`
ShiftBy int64
IsAnomaly bool
QueriesUsedInFormula []string
}
func (b *BuilderQuery) Clone() *BuilderQuery {
if b == nil {
return nil
}
return &BuilderQuery{
QueryName: b.QueryName,
StepInterval: b.StepInterval,
DataSource: b.DataSource,
AggregateOperator: b.AggregateOperator,
AggregateAttribute: b.AggregateAttribute,
Temporality: b.Temporality,
Filters: b.Filters.Clone(),
GroupBy: b.GroupBy,
Expression: b.Expression,
Disabled: b.Disabled,
Having: b.Having,
Legend: b.Legend,
Limit: b.Limit,
Offset: b.Offset,
PageSize: b.PageSize,
OrderBy: b.OrderBy,
ReduceTo: b.ReduceTo,
SelectColumns: b.SelectColumns,
TimeAggregation: b.TimeAggregation,
SpaceAggregation: b.SpaceAggregation,
Functions: b.Functions,
ShiftBy: b.ShiftBy,
IsAnomaly: b.IsAnomaly,
QueriesUsedInFormula: b.QueriesUsedInFormula,
}
}
// CanDefaultZero returns true if the missing value can be substituted by zero
@ -877,6 +989,16 @@ type FilterSet struct {
Items []FilterItem `json:"items"`
}
func (f *FilterSet) Clone() *FilterSet {
if f == nil {
return nil
}
return &FilterSet{
Operator: f.Operator,
Items: f.Items,
}
}
func (f *FilterSet) Validate() error {
if f == nil {
return nil
@ -1028,12 +1150,15 @@ type Table struct {
}
type Result struct {
QueryName string `json:"queryName,omitempty"`
Series []*Series `json:"series,omitempty"`
PredictedSeries []*Series `json:"predictedSeries,omitempty"`
AnomalyScores []*Series `json:"anomalyScores,omitempty"`
List []*Row `json:"list,omitempty"`
Table *Table `json:"table,omitempty"`
QueryName string `json:"queryName,omitempty"`
Series []*Series `json:"series,omitempty"`
PredictedSeries []*Series `json:"predictedSeries,omitempty"`
UpperBoundSeries []*Series `json:"upperBoundSeries,omitempty"`
LowerBoundSeries []*Series `json:"lowerBoundSeries,omitempty"`
AnomalyScores []*Series `json:"anomalyScores,omitempty"`
List []*Row `json:"list,omitempty"`
Table *Table `json:"table,omitempty"`
IsAnomaly bool `json:"isAnomaly,omitempty"`
}
type Series struct {