From 1066b217cb0e9e9d5b521dd9a8656c75002b685c Mon Sep 17 00:00:00 2001 From: Nityananda Gohain Date: Wed, 4 Sep 2024 10:35:13 +0530 Subject: [PATCH] fix: fix logic for cache (#5811) * fix: fix logic for cache * fix: replace cache during error * fix: add todo comment for replaceCachedData --------- Co-authored-by: Srikanth Chekuri --- pkg/query-service/app/querier/helper.go | 15 +++++++++--- pkg/query-service/app/querier/querier.go | 19 +++++++++++---- pkg/query-service/app/querier/querier_test.go | 21 ++++++++++------- pkg/query-service/app/querier/v2/helper.go | 12 +++++++--- pkg/query-service/app/querier/v2/querier.go | 22 +++++++++++++----- .../app/querier/v2/querier_test.go | 23 +++++++++++-------- 6 files changed, 78 insertions(+), 34 deletions(-) diff --git a/pkg/query-service/app/querier/helper.go b/pkg/query-service/app/querier/helper.go index 1da4a5a46a..7c45cc8781 100644 --- a/pkg/query-service/app/querier/helper.go +++ b/pkg/query-service/app/querier/helper.go @@ -122,7 +122,7 @@ func (q *querier) runBuilderQuery( cachedData = data } } - misses := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData) + misses, replaceCachedData := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData) missedSeries := make([]*v3.Series, 0) cachedSeries := make([]*v3.Series, 0) for _, miss := range misses { @@ -147,6 +147,9 @@ func (q *querier) runBuilderQuery( zap.L().Error("error unmarshalling cached data", zap.Error(err)) } mergedSeries := mergeSerieses(cachedSeries, missedSeries) + if replaceCachedData { + mergedSeries = missedSeries + } var mergedSeriesData []byte var marshallingErr error @@ -257,7 +260,7 @@ func (q *querier) runBuilderQuery( cachedData = data } } - misses := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData) + misses, replaceCachedData := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData) missedSeries := make([]*v3.Series, 0) cachedSeries := make([]*v3.Series, 0) for _, miss := range misses { @@ -294,6 +297,9 @@ func (q *querier) runBuilderQuery( zap.L().Error("error unmarshalling cached data", zap.Error(err)) } mergedSeries := mergeSerieses(cachedSeries, missedSeries) + if replaceCachedData { + mergedSeries = missedSeries + } var mergedSeriesData []byte var marshallingErr error missedSeriesLen := len(missedSeries) @@ -360,7 +366,7 @@ func (q *querier) runBuilderExpression( } } step := postprocess.StepIntervalForFunction(params, queryName) - misses := q.findMissingTimeRanges(params.Start, params.End, step, cachedData) + misses, replaceCachedData := q.findMissingTimeRanges(params.Start, params.End, step, cachedData) missedSeries := make([]*v3.Series, 0) cachedSeries := make([]*v3.Series, 0) for _, miss := range misses { @@ -384,6 +390,9 @@ func (q *querier) runBuilderExpression( zap.L().Error("error unmarshalling cached data", zap.Error(err)) } mergedSeries := mergeSerieses(cachedSeries, missedSeries) + if replaceCachedData { + mergedSeries = missedSeries + } var mergedSeriesData []byte missedSeriesLen := len(missedSeries) diff --git a/pkg/query-service/app/querier/querier.go b/pkg/query-service/app/querier/querier.go index 64e4a33ed2..86a77da114 100644 --- a/pkg/query-service/app/querier/querier.go +++ b/pkg/query-service/app/querier/querier.go @@ -149,7 +149,12 @@ func (q *querier) execPromQuery(ctx context.Context, params *model.QueryRangePar // // The [End - fluxInterval, End] is always added to the list of misses, because // the data might still be in flux and not yet available in the database. -func findMissingTimeRanges(start, end, step int64, seriesList []*v3.Series, fluxInterval time.Duration) (misses []missInterval) { +// +// replaceCacheData is used to indicate if the cache data should be replaced instead of merging +// with the new data +// TODO: Remove replaceCacheData with a better logic +func findMissingTimeRanges(start, end, step int64, seriesList []*v3.Series, fluxInterval time.Duration) (misses []missInterval, replaceCacheData bool) { + replaceCacheData = false var cachedStart, cachedEnd int64 for idx := range seriesList { series := seriesList[idx] @@ -204,6 +209,7 @@ func findMissingTimeRanges(start, end, step int64, seriesList []*v3.Series, flux // Case 5: Cached time range is a disjoint of the requested time range // Add a miss for the entire requested time range misses = append(misses, missInterval{start: start, end: end}) + replaceCacheData = true } // remove the struts with start > end @@ -214,16 +220,16 @@ func findMissingTimeRanges(start, end, step int64, seriesList []*v3.Series, flux validMisses = append(validMisses, miss) } } - return validMisses + return validMisses, replaceCacheData } // findMissingTimeRanges finds the missing time ranges in the cached data // and returns them as a list of misses -func (q *querier) findMissingTimeRanges(start, end, step int64, cachedData []byte) (misses []missInterval) { +func (q *querier) findMissingTimeRanges(start, end, step int64, cachedData []byte) (misses []missInterval, replaceCachedData bool) { var cachedSeriesList []*v3.Series if err := json.Unmarshal(cachedData, &cachedSeriesList); err != nil { // In case of error, we return the entire range as a miss - return []missInterval{{start: start, end: end}} + return []missInterval{{start: start, end: end}}, true } return findMissingTimeRanges(start, end, step, cachedSeriesList, q.fluxInterval) } @@ -355,7 +361,7 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam cachedData = data } } - misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData) + misses, replaceCachedData := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData) missedSeries := make([]*v3.Series, 0) cachedSeries := make([]*v3.Series, 0) for _, miss := range misses { @@ -372,6 +378,9 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam zap.L().Error("error unmarshalling cached data", zap.Error(err)) } mergedSeries := mergeSerieses(cachedSeries, missedSeries) + if replaceCachedData { + mergedSeries = missedSeries + } channelResults <- channelResult{Err: nil, Name: queryName, Query: promQuery.Query, Series: mergedSeries} diff --git a/pkg/query-service/app/querier/querier_test.go b/pkg/query-service/app/querier/querier_test.go index 962ca3832a..aecb7b27ba 100644 --- a/pkg/query-service/app/querier/querier_test.go +++ b/pkg/query-service/app/querier/querier_test.go @@ -20,12 +20,13 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { // 4. Cached time range is a right overlap of the requested time range // 5. Cached time range is a disjoint of the requested time range testCases := []struct { - name string - requestedStart int64 // in milliseconds - requestedEnd int64 // in milliseconds - requestedStep int64 // in seconds - cachedSeries []*v3.Series - expectedMiss []missInterval + name string + requestedStart int64 // in milliseconds + requestedEnd int64 // in milliseconds + requestedStep int64 // in seconds + cachedSeries []*v3.Series + expectedMiss []missInterval + replaceCachedData bool }{ { name: "cached time range is a subset of the requested time range", @@ -190,15 +191,19 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { end: 1675115596722 + 180*60*1000, }, }, + replaceCachedData: true, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - misses := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, 0*time.Minute) + misses, replaceCachedData := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, 0*time.Minute) if len(misses) != len(tc.expectedMiss) { t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses)) } + if replaceCachedData != tc.replaceCachedData { + t.Errorf("expected replaceCachedData %t, got %t", tc.replaceCachedData, replaceCachedData) + } for i, miss := range misses { if miss.start != tc.expectedMiss[i].start { t.Errorf("expected start %d, got %d", tc.expectedMiss[i].start, miss.start) @@ -395,7 +400,7 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - misses := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, tc.fluxInterval) + misses, _ := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, tc.fluxInterval) if len(misses) != len(tc.expectedMiss) { t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses)) } diff --git a/pkg/query-service/app/querier/v2/helper.go b/pkg/query-service/app/querier/v2/helper.go index 9ee90fb913..de9d591f7f 100644 --- a/pkg/query-service/app/querier/v2/helper.go +++ b/pkg/query-service/app/querier/v2/helper.go @@ -123,7 +123,7 @@ func (q *querier) runBuilderQuery( cachedData = data } } - misses := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData) + misses, replaceCachedData := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData) missedSeries := make([]*v3.Series, 0) cachedSeries := make([]*v3.Series, 0) for _, miss := range misses { @@ -148,7 +148,9 @@ func (q *querier) runBuilderQuery( zap.L().Error("error unmarshalling cached data", zap.Error(err)) } mergedSeries := mergeSerieses(cachedSeries, missedSeries) - + if replaceCachedData { + mergedSeries = missedSeries + } var mergedSeriesData []byte var marshallingErr error missedSeriesLen := len(missedSeries) @@ -257,7 +259,7 @@ func (q *querier) runBuilderQuery( cachedData = data } } - misses := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData) + misses, replaceCachedData := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData) missedSeries := make([]*v3.Series, 0) cachedSeries := make([]*v3.Series, 0) for _, miss := range misses { @@ -294,6 +296,10 @@ func (q *querier) runBuilderQuery( zap.L().Error("error unmarshalling cached data", zap.Error(err)) } mergedSeries := mergeSerieses(cachedSeries, missedSeries) + if replaceCachedData { + mergedSeries = missedSeries + } + var mergedSeriesData []byte var marshallingErr error missedSeriesLen := len(missedSeries) diff --git a/pkg/query-service/app/querier/v2/querier.go b/pkg/query-service/app/querier/v2/querier.go index 5e0c18afb5..d0c3a77d13 100644 --- a/pkg/query-service/app/querier/v2/querier.go +++ b/pkg/query-service/app/querier/v2/querier.go @@ -153,7 +153,12 @@ func (q *querier) execPromQuery(ctx context.Context, params *model.QueryRangePar // // The [End - fluxInterval, End] is always added to the list of misses, because // the data might still be in flux and not yet available in the database. -func findMissingTimeRanges(start, end, step int64, seriesList []*v3.Series, fluxInterval time.Duration) (misses []missInterval) { +// +// replaceCacheData is used to indicate if the cache data should be replaced instead of merging +// with the new data +// TODO: Remove replaceCacheData with a better logic +func findMissingTimeRanges(start, end, step int64, seriesList []*v3.Series, fluxInterval time.Duration) (misses []missInterval, replaceCacheData bool) { + replaceCacheData = false var cachedStart, cachedEnd int64 for idx := range seriesList { series := seriesList[idx] @@ -168,6 +173,8 @@ func findMissingTimeRanges(start, end, step int64, seriesList []*v3.Series, flux } } + // time.Now is used because here we are considering the case where data might not + // be fully ingested for last (fluxInterval) minutes endMillis := time.Now().UnixMilli() adjustStep := int64(math.Min(float64(step), 60)) roundedMillis := endMillis - (endMillis % (adjustStep * 1000)) @@ -206,6 +213,7 @@ func findMissingTimeRanges(start, end, step int64, seriesList []*v3.Series, flux // Case 5: Cached time range is a disjoint of the requested time range // Add a miss for the entire requested time range misses = append(misses, missInterval{start: start, end: end}) + replaceCacheData = true } // remove the struts with start > end @@ -216,16 +224,16 @@ func findMissingTimeRanges(start, end, step int64, seriesList []*v3.Series, flux validMisses = append(validMisses, miss) } } - return validMisses + return validMisses, replaceCacheData } // findMissingTimeRanges finds the missing time ranges in the cached data // and returns them as a list of misses -func (q *querier) findMissingTimeRanges(start, end, step int64, cachedData []byte) (misses []missInterval) { +func (q *querier) findMissingTimeRanges(start, end, step int64, cachedData []byte) (misses []missInterval, replaceCachedData bool) { var cachedSeriesList []*v3.Series if err := json.Unmarshal(cachedData, &cachedSeriesList); err != nil { // In case of error, we return the entire range as a miss - return []missInterval{{start: start, end: end}} + return []missInterval{{start: start, end: end}}, true } return findMissingTimeRanges(start, end, step, cachedSeriesList, q.fluxInterval) } @@ -363,7 +371,7 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam cachedData = data } } - misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData) + misses, replaceCachedData := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData) missedSeries := make([]*v3.Series, 0) cachedSeries := make([]*v3.Series, 0) for _, miss := range misses { @@ -380,7 +388,9 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam zap.L().Error("error unmarshalling cached data", zap.Error(err)) } mergedSeries := mergeSerieses(cachedSeries, missedSeries) - + if replaceCachedData { + mergedSeries = missedSeries + } channelResults <- channelResult{Err: nil, Name: queryName, Query: promQuery.Query, Series: mergedSeries} // Cache the seriesList for future queries diff --git a/pkg/query-service/app/querier/v2/querier_test.go b/pkg/query-service/app/querier/v2/querier_test.go index b8309c68ff..5707e9f70d 100644 --- a/pkg/query-service/app/querier/v2/querier_test.go +++ b/pkg/query-service/app/querier/v2/querier_test.go @@ -12,7 +12,7 @@ import ( v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) -func TestV2FindMissingTimeRangesZeroFreshNess(t *testing.T) { +func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { // There are five scenarios: // 1. Cached time range is a subset of the requested time range // 2. Cached time range is a superset of the requested time range @@ -20,12 +20,13 @@ func TestV2FindMissingTimeRangesZeroFreshNess(t *testing.T) { // 4. Cached time range is a right overlap of the requested time range // 5. Cached time range is a disjoint of the requested time range testCases := []struct { - name string - requestedStart int64 // in milliseconds - requestedEnd int64 // in milliseconds - requestedStep int64 // in seconds - cachedSeries []*v3.Series - expectedMiss []missInterval + name string + requestedStart int64 // in milliseconds + requestedEnd int64 // in milliseconds + requestedStep int64 // in seconds + cachedSeries []*v3.Series + expectedMiss []missInterval + replaceCachedData bool }{ { name: "cached time range is a subset of the requested time range", @@ -190,15 +191,19 @@ func TestV2FindMissingTimeRangesZeroFreshNess(t *testing.T) { end: 1675115596722 + 180*60*1000, }, }, + replaceCachedData: true, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - misses := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, 0*time.Minute) + misses, replaceCachedData := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, 0*time.Minute) if len(misses) != len(tc.expectedMiss) { t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses)) } + if replaceCachedData != tc.replaceCachedData { + t.Errorf("expected replaceCachedData %t, got %t", tc.replaceCachedData, replaceCachedData) + } for i, miss := range misses { if miss.start != tc.expectedMiss[i].start { t.Errorf("expected start %d, got %d", tc.expectedMiss[i].start, miss.start) @@ -395,7 +400,7 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - misses := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, tc.fluxInterval) + misses, _ := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, tc.fluxInterval) if len(misses) != len(tc.expectedMiss) { t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses)) }