From e7fabca38e27d94b9cc0960f6fec4a9a75c7bd9f Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Mon, 8 Jan 2024 18:00:42 +0530 Subject: [PATCH] chore: fix the query range cache gaps (#4283) --- pkg/query-service/app/querier/helper.go | 4 +-- pkg/query-service/app/querier/querier.go | 13 ++++++--- pkg/query-service/app/querier/querier_test.go | 18 ++++++++++-- pkg/query-service/model/v3/v3.go | 29 +++++++++++++++++++ 4 files changed, 56 insertions(+), 8 deletions(-) diff --git a/pkg/query-service/app/querier/helper.go b/pkg/query-service/app/querier/helper.go index 82bf2cc464..5bb3cc81f8 100644 --- a/pkg/query-service/app/querier/helper.go +++ b/pkg/query-service/app/querier/helper.go @@ -160,7 +160,7 @@ func (q *querier) runBuilderQuery( if !params.NoCache && q.cache != nil { var retrieveStatus status.RetrieveStatus data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true) - zap.S().Debug("cache retrieve status", zap.String("status", retrieveStatus.String())) + zap.S().Infof("cache retrieve status: %s", retrieveStatus.String()) if err == nil { cachedData = data } @@ -254,7 +254,7 @@ func (q *querier) runBuilderExpression( if !params.NoCache && q.cache != nil { var retrieveStatus status.RetrieveStatus data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true) - zap.S().Debug("cache retrieve status", zap.String("status", retrieveStatus.String())) + zap.S().Infof("cache retrieve status: %s", retrieveStatus.String()) if err == nil { cachedData = data } diff --git a/pkg/query-service/app/querier/querier.go b/pkg/query-service/app/querier/querier.go index d51e5713bf..395659ced2 100644 --- a/pkg/query-service/app/querier/querier.go +++ b/pkg/query-service/app/querier/querier.go @@ -145,7 +145,7 @@ func (q *querier) execPromQuery(ctx context.Context, params *model.QueryRangePar // // The [End - fluxInterval, End] is always added to the list of misses, because // the data might still be in flux and not yet available in the database. -func findMissingTimeRanges(start, end int64, seriesList []*v3.Series, fluxInterval time.Duration) (misses []missInterval) { +func findMissingTimeRanges(start, end, step int64, seriesList []*v3.Series, fluxInterval time.Duration) (misses []missInterval) { var cachedStart, cachedEnd int64 for idx := range seriesList { series := seriesList[idx] @@ -160,11 +160,15 @@ func findMissingTimeRanges(start, end int64, seriesList []*v3.Series, fluxInterv } } + endMillis := time.Now().UnixMilli() + adjustStep := int64(math.Min(float64(step), 60)) + roundedMillis := endMillis - (endMillis % (adjustStep * 1000)) + // Exclude the flux interval from the cached end time cachedEnd = int64( math.Min( float64(cachedEnd), - float64(time.Now().UnixMilli()-fluxInterval.Milliseconds()), + float64(roundedMillis-fluxInterval.Milliseconds()), ), ) @@ -215,7 +219,7 @@ func (q *querier) findMissingTimeRanges(start, end, step int64, cachedData []byt // In case of error, we return the entire range as a miss return []missInterval{{start: start, end: end}} } - return findMissingTimeRanges(start, end, cachedSeriesList, q.fluxInterval) + return findMissingTimeRanges(start, end, step, cachedSeriesList, q.fluxInterval) } func labelsToString(labels map[string]string) string { @@ -258,6 +262,7 @@ func mergeSerieses(cachedSeries, missedSeries []*v3.Series) []*v3.Series { for idx := range seriesesByLabels { series := seriesesByLabels[idx] series.SortPoints() + series.RemoveDuplicatePoints() mergedSeries = append(mergedSeries, series) } return mergedSeries @@ -326,7 +331,7 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam // Ensure NoCache is not set and cache is not nil if !params.NoCache && q.cache != nil { data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true) - zap.S().Debug("cache retrieve status", zap.String("status", retrieveStatus.String())) + zap.S().Infof("cache retrieve status: %s", retrieveStatus.String()) if err == nil { cachedData = data } diff --git a/pkg/query-service/app/querier/querier_test.go b/pkg/query-service/app/querier/querier_test.go index f08ae82dcd..605d2f5180 100644 --- a/pkg/query-service/app/querier/querier_test.go +++ b/pkg/query-service/app/querier/querier_test.go @@ -23,6 +23,7 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { name string requestedStart int64 // in milliseconds requestedEnd int64 // in milliseconds + requestedStep int64 // in seconds cachedSeries []*v3.Series expectedMiss []missInterval }{ @@ -30,6 +31,7 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { name: "cached time range is a subset of the requested time range", requestedStart: 1675115596722, requestedEnd: 1675115596722 + 180*60*1000, + requestedStep: 60, cachedSeries: []*v3.Series{ { Labels: map[string]string{ @@ -62,6 +64,7 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { name: "cached time range is a superset of the requested time range", requestedStart: 1675115596722, requestedEnd: 1675115596722 + 180*60*1000, + requestedStep: 60, cachedSeries: []*v3.Series{ { Labels: map[string]string{ @@ -93,6 +96,7 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { name: "cached time range is a left overlap of the requested time range", requestedStart: 1675115596722, requestedEnd: 1675115596722 + 180*60*1000, + requestedStep: 60, cachedSeries: []*v3.Series{ { Labels: map[string]string{ @@ -125,6 +129,7 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { name: "cached time range is a right overlap of the requested time range", requestedStart: 1675115596722, requestedEnd: 1675115596722 + 180*60*1000, + requestedStep: 60, cachedSeries: []*v3.Series{ { Labels: map[string]string{ @@ -157,6 +162,7 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { name: "cached time range is a disjoint of the requested time range", requestedStart: 1675115596722, requestedEnd: 1675115596722 + 180*60*1000, + requestedStep: 60, cachedSeries: []*v3.Series{ { Labels: map[string]string{ @@ -189,7 +195,7 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - misses := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.cachedSeries, 0*time.Minute) + misses := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, 0*time.Minute) if len(misses) != len(tc.expectedMiss) { t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses)) } @@ -211,6 +217,7 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) { name string requestedStart int64 requestedEnd int64 + requestedStep int64 cachedSeries []*v3.Series fluxInterval time.Duration expectedMiss []missInterval @@ -219,6 +226,7 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) { name: "cached time range is a subset of the requested time range", requestedStart: 1675115596722, requestedEnd: 1675115596722 + 180*60*1000, + requestedStep: 60, cachedSeries: []*v3.Series{ { Labels: map[string]string{ @@ -252,6 +260,7 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) { name: "cached time range is a superset of the requested time range", requestedStart: 1675115596722, requestedEnd: 1675115596722 + 180*60*1000, + requestedStep: 60, cachedSeries: []*v3.Series{ { Labels: map[string]string{ @@ -284,6 +293,7 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) { name: "cache time range is a left overlap of the requested time range", requestedStart: 1675115596722, requestedEnd: 1675115596722 + 180*60*1000, + requestedStep: 60, cachedSeries: []*v3.Series{ { Labels: map[string]string{ @@ -317,6 +327,7 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) { name: "cache time range is a right overlap of the requested time range", requestedStart: 1675115596722, requestedEnd: 1675115596722 + 180*60*1000, + requestedStep: 60, cachedSeries: []*v3.Series{ { Labels: map[string]string{ @@ -350,6 +361,7 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) { name: "cache time range is a disjoint of the requested time range", requestedStart: 1675115596722, requestedEnd: 1675115596722 + 180*60*1000, + requestedStep: 60, cachedSeries: []*v3.Series{ { Labels: map[string]string{ @@ -383,7 +395,7 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - misses := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.cachedSeries, tc.fluxInterval) + misses := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, tc.fluxInterval) if len(misses) != len(tc.expectedMiss) { t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses)) } @@ -404,6 +416,7 @@ func TestQueryRange(t *testing.T) { { Start: 1675115596722, End: 1675115596722 + 120*60*1000, + Step: 60, CompositeQuery: &v3.CompositeQuery{ QueryType: v3.QueryTypeBuilder, PanelType: v3.PanelTypeGraph, @@ -436,6 +449,7 @@ func TestQueryRange(t *testing.T) { { Start: 1675115596722 + 60*60*1000, End: 1675115596722 + 180*60*1000, + Step: 60, CompositeQuery: &v3.CompositeQuery{ QueryType: v3.QueryTypeBuilder, PanelType: v3.PanelTypeGraph, diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index 453c6475a8..968fe188e6 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -680,6 +680,35 @@ func (s *Series) SortPoints() { }) } +func (s *Series) RemoveDuplicatePoints() { + if len(s.Points) == 0 { + return + } + + // priortize the last point + // this is to handle the case where the same point is sent twice + // the last point is the most recent point adjusted for the flux interval + + newPoints := make([]Point, 0) + for i := len(s.Points) - 1; i >= 0; i-- { + if len(newPoints) == 0 { + newPoints = append(newPoints, s.Points[i]) + continue + } + if newPoints[len(newPoints)-1].Timestamp != s.Points[i].Timestamp { + newPoints = append(newPoints, s.Points[i]) + } + } + + // reverse the points + for i := len(newPoints)/2 - 1; i >= 0; i-- { + opp := len(newPoints) - 1 - i + newPoints[i], newPoints[opp] = newPoints[opp], newPoints[i] + } + + s.Points = newPoints +} + type Row struct { Timestamp time.Time `json:"timestamp"` Data map[string]interface{} `json:"data"`