From 9a3c49bce47de20e68a4daccc24efa9670b00034 Mon Sep 17 00:00:00 2001 From: Nityananda Gohain Date: Thu, 13 Mar 2025 10:04:06 +0530 Subject: [PATCH] Store complete intervals in cache and update logic for response (#7212) * fix: new implementation for finding missing timerange * fix: remove unwanted code * fix: update if condition * fix: update logic and the test cases * fix: correct name * fix: filter points which are not a complete agg interval * fix: fix the logic to use the points correctly * fix: fix overlapping test case * fix: add comments * Update pkg/query-service/querycache/query_range_cache.go Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> * fix: use step ms * fix: use step ms * fix: tests * fix: update logic to handle actual empty series * fix: name updated * Update pkg/query-service/app/querier/v2/helper.go Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> * fix: address comments * fix: address comments * fix: address comments * Update pkg/query-service/common/query_range.go Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> * fix: add error log * fix: handle case where end is equal to a complete window end * fix: added comments * fix: address comments * fix: move function to common query range --------- Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> --- pkg/query-service/app/querier/helper.go | 24 +- pkg/query-service/app/querier/v2/helper.go | 25 +- pkg/query-service/common/query_range.go | 106 +++++ pkg/query-service/common/query_range_test.go | 435 ++++++++++++++++++ pkg/query-service/interfaces/interface.go | 2 + .../querycache/query_range_cache.go | 22 +- 6 files changed, 602 insertions(+), 12 deletions(-) create mode 100644 pkg/query-service/common/query_range_test.go diff --git a/pkg/query-service/app/querier/helper.go b/pkg/query-service/app/querier/helper.go index a82b3f815b..abcc5a066c 100644 --- a/pkg/query-service/app/querier/helper.go +++ b/pkg/query-service/app/querier/helper.go @@ -122,6 +122,7 @@ func (q *querier) runBuilderQuery( misses := q.queryCache.FindMissingTimeRanges(start, end, builderQuery.StepInterval, cacheKeys[queryName]) zap.L().Info("cache misses for logs query", zap.Any("misses", misses)) missedSeries := make([]querycache.CachedSeriesData, 0) + filteredMissedSeries := make([]querycache.CachedSeriesData, 0) for _, miss := range misses { query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, miss.Start, miss.End, builderQuery, params, preferRPM) if err != nil { @@ -138,15 +139,32 @@ func (q *querier) runBuilderQuery( } return } + filteredSeries, startTime, endTime := common.FilterSeriesPoints(series, miss.Start, miss.End, builderQuery.StepInterval) + + // making sure that empty range doesn't doesn't enter the cache + // empty results from filteredSeries means data was filtered out, but empty series means actual empty data + if len(filteredSeries) > 0 || len(series) == 0 { + filteredMissedSeries = append(filteredMissedSeries, querycache.CachedSeriesData{ + Data: filteredSeries, + Start: startTime, + End: endTime, + }) + } + + // for the actual response missedSeries = append(missedSeries, querycache.CachedSeriesData{ + Data: series, Start: miss.Start, End: miss.End, - Data: series, }) } - mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKeys[queryName], missedSeries) - resultSeries := common.GetSeriesFromCachedData(mergedSeries, start, end) + filteredMergedSeries := q.queryCache.MergeWithCachedSeriesDataV2(cacheKeys[queryName], filteredMissedSeries) + q.queryCache.StoreSeriesInCache(cacheKeys[queryName], filteredMergedSeries) + + mergedSeries := q.queryCache.MergeWithCachedSeriesDataV2(cacheKeys[queryName], missedSeries) + + resultSeries := common.GetSeriesFromCachedDataV2(mergedSeries, start, end, builderQuery.StepInterval) ch <- channelResult{ Err: nil, diff --git a/pkg/query-service/app/querier/v2/helper.go b/pkg/query-service/app/querier/v2/helper.go index 4846aa3f3b..bf43cbc18a 100644 --- a/pkg/query-service/app/querier/v2/helper.go +++ b/pkg/query-service/app/querier/v2/helper.go @@ -119,9 +119,10 @@ func (q *querier) runBuilderQuery( ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series} return } - misses := q.queryCache.FindMissingTimeRanges(start, end, builderQuery.StepInterval, cacheKeys[queryName]) + misses := q.queryCache.FindMissingTimeRangesV2(start, end, builderQuery.StepInterval, cacheKeys[queryName]) zap.L().Info("cache misses for logs query", zap.Any("misses", misses)) missedSeries := make([]querycache.CachedSeriesData, 0) + filteredMissedSeries := make([]querycache.CachedSeriesData, 0) for _, miss := range misses { query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, miss.Start, miss.End, builderQuery, params, preferRPM) if err != nil { @@ -138,15 +139,33 @@ func (q *querier) runBuilderQuery( } return } + + filteredSeries, startTime, endTime := common.FilterSeriesPoints(series, miss.Start, miss.End, builderQuery.StepInterval) + + // making sure that empty range doesn't doesn't enter the cache + // empty results from filteredSeries means data was filtered out, but empty series means actual empty data + if len(filteredSeries) > 0 || len(series) == 0 { + filteredMissedSeries = append(filteredMissedSeries, querycache.CachedSeriesData{ + Data: filteredSeries, + Start: startTime, + End: endTime, + }) + } + + // for the actual response missedSeries = append(missedSeries, querycache.CachedSeriesData{ Data: series, Start: miss.Start, End: miss.End, }) } - mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKeys[queryName], missedSeries) - resultSeries := common.GetSeriesFromCachedData(mergedSeries, start, end) + filteredMergedSeries := q.queryCache.MergeWithCachedSeriesDataV2(cacheKeys[queryName], filteredMissedSeries) + q.queryCache.StoreSeriesInCache(cacheKeys[queryName], filteredMergedSeries) + + mergedSeries := q.queryCache.MergeWithCachedSeriesDataV2(cacheKeys[queryName], missedSeries) + + resultSeries := common.GetSeriesFromCachedDataV2(mergedSeries, start, end, builderQuery.StepInterval) ch <- channelResult{ Err: nil, diff --git a/pkg/query-service/common/query_range.go b/pkg/query-service/common/query_range.go index 598ac1a21c..4a89384862 100644 --- a/pkg/query-service/common/query_range.go +++ b/pkg/query-service/common/query_range.go @@ -3,6 +3,7 @@ package common import ( "math" "regexp" + "sort" "time" "unicode" @@ -123,3 +124,108 @@ func GetSeriesFromCachedData(data []querycache.CachedSeriesData, start, end int6 } return newSeries } + +// It is different from GetSeriesFromCachedData because doesn't remove a point if it is >= (start - (start % step*1000)) +func GetSeriesFromCachedDataV2(data []querycache.CachedSeriesData, start, end, step int64) []*v3.Series { + series := make(map[uint64]*v3.Series) + + for _, cachedData := range data { + for _, data := range cachedData.Data { + h := labels.FromMap(data.Labels).Hash() + + if _, ok := series[h]; !ok { + series[h] = &v3.Series{ + Labels: data.Labels, + LabelsArray: data.LabelsArray, + Points: make([]v3.Point, 0), + } + } + + for _, point := range data.Points { + if point.Timestamp >= (start-(start%(step*1000))) && point.Timestamp <= end { + series[h].Points = append(series[h].Points, point) + } + } + } + } + + newSeries := make([]*v3.Series, 0, len(series)) + for _, s := range series { + s.SortPoints() + s.RemoveDuplicatePoints() + newSeries = append(newSeries, s) + } + return newSeries +} + +// filter series points for storing in cache +func FilterSeriesPoints(seriesList []*v3.Series, missStart, missEnd int64, stepInterval int64) ([]*v3.Series, int64, int64) { + filteredSeries := make([]*v3.Series, 0) + startTime := missStart + endTime := missEnd + + stepMs := stepInterval * 1000 + + // return empty series if the interval is not complete + if missStart+stepMs > missEnd { + return []*v3.Series{}, missStart, missEnd + } + + // if the end time is not a complete aggregation window, then we will have to adjust the end time + // to the previous complete aggregation window end + endCompleteWindow := missEnd%stepMs == 0 + if !endCompleteWindow { + endTime = missEnd - (missEnd % stepMs) + } + + // if the start time is not a complete aggregation window, then we will have to adjust the start time + // to the next complete aggregation window + if missStart%stepMs != 0 { + startTime = missStart + stepMs - (missStart % stepMs) + } + + for _, series := range seriesList { + // if data for the series is empty, then we will add it to the cache + if len(series.Points) == 0 { + filteredSeries = append(filteredSeries, &v3.Series{ + Labels: series.Labels, + LabelsArray: series.LabelsArray, + Points: make([]v3.Point, 0), + }) + continue + } + + // Sort the points based on timestamp + sort.Slice(series.Points, func(i, j int) bool { + return series.Points[i].Timestamp < series.Points[j].Timestamp + }) + + points := make([]v3.Point, len(series.Points)) + copy(points, series.Points) + + // Filter the first point that is not a complete aggregation window + if series.Points[0].Timestamp < missStart { + // Remove the first point + points = points[1:] + } + + // filter the last point if it is not a complete aggregation window + // adding or condition to handle the end time is equal to a complete window end https://github.com/SigNoz/signoz/pull/7212#issuecomment-2703677190 + if (!endCompleteWindow && series.Points[len(series.Points)-1].Timestamp == missEnd-(missEnd%stepMs)) || + (endCompleteWindow && series.Points[len(series.Points)-1].Timestamp == missEnd) { + // Remove the last point + points = points[:len(points)-1] + } + + // making sure that empty range doesn't enter the cache + if len(points) > 0 { + filteredSeries = append(filteredSeries, &v3.Series{ + Labels: series.Labels, + LabelsArray: series.LabelsArray, + Points: points, + }) + } + } + + return filteredSeries, startTime, endTime +} diff --git a/pkg/query-service/common/query_range_test.go b/pkg/query-service/common/query_range_test.go new file mode 100644 index 0000000000..03779996fe --- /dev/null +++ b/pkg/query-service/common/query_range_test.go @@ -0,0 +1,435 @@ +package common + +import ( + "testing" + + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/querycache" +) + +func TestFilterSeriesPoints(t *testing.T) { + // Define test cases + testCases := []struct { + name string + seriesList []*v3.Series + missStart int64 // in milliseconds + missEnd int64 // in milliseconds + stepInterval int64 // in seconds + expectedPoints []*v3.Series + expectedStart int64 // in milliseconds + expectedEnd int64 // in milliseconds + }{ + { + name: "Complete aggregation window", + missStart: 1609459200000, // 01 Jan 2021 00:00:00 UTC + missEnd: 1609466400000, // 01 Jan 2021 02:00:00 UTC + stepInterval: 3600, // 1 hour + seriesList: []*v3.Series{ + { + Points: []v3.Point{ + {Timestamp: 1609459200000, Value: 1.0}, // 01 Jan 2021 00:00:00 UTC + {Timestamp: 1609462800000, Value: 2.0}, // 01 Jan 2021 01:00:00 UTC + {Timestamp: 1609466400000, Value: 3.0}, // 01 Jan 2021 02:00:00 UTC + }, + }, + }, + expectedPoints: []*v3.Series{ + { + Points: []v3.Point{ + {Timestamp: 1609459200000, Value: 1.0}, + {Timestamp: 1609462800000, Value: 2.0}, + }, + }, + }, + expectedStart: 1609459200000, + expectedEnd: 1609466400000, + }, + { + name: "Filter first point", + missStart: 1609464600000, // 01 Jan 2021 01:30:00 UTC + missEnd: 1609470000000, // 01 Jan 2021 03:00:00 UTC + stepInterval: 3600, // 1 hour + seriesList: []*v3.Series{ + { + Points: []v3.Point{ + {Timestamp: 1609462800000, Value: 2.0}, // 01 Jan 2021 01:00:00 UTC + {Timestamp: 1609466400000, Value: 3.0}, // 01 Jan 2021 02:00:00 UTC + }, + }, + }, + expectedPoints: []*v3.Series{ + { + Points: []v3.Point{ + {Timestamp: 1609466400000, Value: 3.0}, + }, + }, + }, + expectedStart: 1609466400000, + expectedEnd: 1609470000000, + }, + { + name: "Filter last point", + missStart: 1609466400000, // 01 Jan 2021 02:00:00 UTC + missEnd: 1609471800000, // 01 Jan 2021 03:30:00 UTC + stepInterval: 3600, // 1 hour + seriesList: []*v3.Series{ + { + Points: []v3.Point{ + {Timestamp: 1609466400000, Value: 3.0}, // 01 Jan 2021 02:00:00 UTC + {Timestamp: 1609470000000, Value: 3.0}, // 01 Jan 2021 03:00:00 UTC + }, + }, + }, + expectedPoints: []*v3.Series{ + { + Points: []v3.Point{ + {Timestamp: 1609466400000, Value: 3.0}, + }, + }, + }, + expectedStart: 1609466400000, + expectedEnd: 1609470000000, + }, + { + name: "Incomplete aggregation window", + missStart: 1609470000000, // 01 Jan 2021 03:00:00 UTC + missEnd: 1609471800000, // 01 Jan 2021 03:30:00 UTC + stepInterval: 3600, // 1 hour + seriesList: []*v3.Series{ + { + Points: []v3.Point{}, + }, + }, + expectedPoints: []*v3.Series{}, + expectedStart: 1609470000000, + expectedEnd: 1609471800000, + }, + { + name: "Filter first point with multiple series", + missStart: 1609464600000, // 01 Jan 2021 01:30:00 UTC + missEnd: 1609477200000, // 01 Jan 2021 05:00:00 UTC + stepInterval: 3600, // 1 hour + seriesList: []*v3.Series{ + { + Points: []v3.Point{ + {Timestamp: 1609462800000, Value: 2.0}, // 01 Jan 2021 01:00:00 UTC + {Timestamp: 1609466400000, Value: 3.0}, // 01 Jan 2021 02:00:00 UTC + {Timestamp: 1609470000000, Value: 4.0}, // 01 Jan 2021 03:00:00 UTC + {Timestamp: 1609473600000, Value: 5.0}, // 01 Jan 2021 04:00:00 UTC + }, + }, + { + Points: []v3.Point{ + {Timestamp: 1609466400000, Value: 6.0}, // 01 Jan 2021 02:00:00 UTC + {Timestamp: 1609470000000, Value: 7.0}, // 01 Jan 2021 03:00:00 UTC + {Timestamp: 1609473600000, Value: 8.0}, // 01 Jan 2021 04:00:00 UTC + }, + }, + { + Points: []v3.Point{ + {Timestamp: 1609466400000, Value: 9.0}, // 01 Jan 2021 02:00:00 UTC + {Timestamp: 1609470000000, Value: 10.0}, // 01 Jan 2021 03:00:00 UTC + {Timestamp: 1609473600000, Value: 11.0}, // 01 Jan 2021 04:00:00 UTC + }, + }, + }, + expectedPoints: []*v3.Series{ + { + Points: []v3.Point{ + {Timestamp: 1609466400000, Value: 3.0}, // 01 Jan 2021 02:00:00 UTC + {Timestamp: 1609470000000, Value: 4.0}, // 01 Jan 2021 03:00:00 UTC + {Timestamp: 1609473600000, Value: 5.0}, // 01 Jan 2021 04:00:00 UTC + }, + }, + { + Points: []v3.Point{ + {Timestamp: 1609466400000, Value: 6.0}, // 01 Jan 2021 02:00:00 UTC + {Timestamp: 1609470000000, Value: 7.0}, // 01 Jan 2021 03:00:00 UTC + {Timestamp: 1609473600000, Value: 8.0}, // 01 Jan 2021 04:00:00 UTC + }, + }, + { + Points: []v3.Point{ + {Timestamp: 1609466400000, Value: 9.0}, // 01 Jan 2021 02:00:00 UTC + {Timestamp: 1609470000000, Value: 10.0}, // 01 Jan 2021 03:00:00 UTC + {Timestamp: 1609473600000, Value: 11.0}, // 01 Jan 2021 04:00:00 UTC + }, + }, + }, + expectedStart: 1609466400000, + expectedEnd: 1609477200000, + }, + { + name: "Filter last point", + missStart: 1609466400000, // 01 Jan 2021 02:00:00 UTC + missEnd: 1609475400000, // 01 Jan 2021 04:30:00 UTC + stepInterval: 3600, // 1 hour + seriesList: []*v3.Series{ + { + Points: []v3.Point{ + {Timestamp: 1609466400000, Value: 3.0}, // 01 Jan 2021 02:00:00 UTC + {Timestamp: 1609470000000, Value: 4.0}, // 01 Jan 2021 03:00:00 UTC + {Timestamp: 1609473600000, Value: 5.0}, // 01 Jan 2021 04:00:00 UTC + }, + }, + { + Points: []v3.Point{ + {Timestamp: 1609466400000, Value: 6.0}, // 01 Jan 2021 02:00:00 UTC + }, + }, + { + Points: []v3.Point{ + {Timestamp: 1609466400000, Value: 9.0}, // 01 Jan 2021 02:00:00 UTC + {Timestamp: 1609470000000, Value: 10.0}, // 01 Jan 2021 03:00:00 UTC + }, + }, + }, + expectedPoints: []*v3.Series{ + { + Points: []v3.Point{ + {Timestamp: 1609466400000, Value: 3.0}, // 01 Jan 2021 02:00:00 UTC + {Timestamp: 1609470000000, Value: 4.0}, // 01 Jan 2021 03:00:00 UTC + }, + }, + { + Points: []v3.Point{ + {Timestamp: 1609466400000, Value: 6.0}, // 01 Jan 2021 02:00:00 UTC + }, + }, + { + Points: []v3.Point{ + {Timestamp: 1609466400000, Value: 9.0}, // 01 Jan 2021 02:00:00 UTC + {Timestamp: 1609470000000, Value: 10.0}, // 01 Jan 2021 03:00:00 UTC + }, + }, + }, + expectedStart: 1609466400000, + expectedEnd: 1609473600000, + }, + { + name: "half range should return empty result", + missStart: 1609473600000, // 01 Jan 2021 04:00:00 UTC + missEnd: 1609475400000, // 01 Jan 2021 04:30:00 UTC + stepInterval: 3600, // 1 hour + seriesList: []*v3.Series{ + { + Points: []v3.Point{ + {Timestamp: 1609473600000, Value: 1.0}, // 01 Jan 2021 04:00:00 UTC + }, + }, + }, + expectedPoints: []*v3.Series{}, + expectedStart: 1609473600000, + expectedEnd: 1609475400000, + }, + { + name: "respect actual empty series", + missStart: 1609466400000, // 01 Jan 2021 02:00:00 UTC + missEnd: 1609475400000, // 01 Jan 2021 04:30:00 UTC + stepInterval: 3600, // 1 hour + seriesList: []*v3.Series{ + { + Points: []v3.Point{}, + }, + }, + expectedPoints: []*v3.Series{ + { + Points: []v3.Point{}, + }, + }, + expectedStart: 1609466400000, + expectedEnd: 1609473600000, + }, + { + name: "Remove point that is not a complete aggregation window", + missStart: 1609466400000, // 01 Jan 2021 02:00:00 UTC + missEnd: 1609470000000, // 01 Jan 2021 03:00:00 UTC + stepInterval: 3600, // 1 hour + seriesList: []*v3.Series{ + { + Points: []v3.Point{ + {Timestamp: 1609466400000, Value: 2.0}, // 01 Jan 2021 02:00:00 UTC + {Timestamp: 1609470000000, Value: 3.0}, // 01 Jan 2021 03:00:00 UTC + }, + }, + }, + expectedPoints: []*v3.Series{ + { + Points: []v3.Point{ + {Timestamp: 1609466400000, Value: 2.0}, + }, + }, + }, + expectedStart: 1609466400000, + expectedEnd: 1609470000000, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + filteredSeries, startTime, endTime := FilterSeriesPoints(tc.seriesList, tc.missStart, tc.missEnd, tc.stepInterval) + + if len(tc.expectedPoints) != len(filteredSeries) { + t.Errorf("Expected %d series, got %d", len(tc.expectedPoints), len(filteredSeries)) + return + } + + for i := range tc.expectedPoints { + if len(tc.expectedPoints[i].Points) != len(filteredSeries[i].Points) { + t.Errorf("Series %d: Expected %d points, got %d\nExpected points: %+v\nGot points: %+v", + i, + len(tc.expectedPoints[i].Points), + len(filteredSeries[i].Points), + tc.expectedPoints[i].Points, + filteredSeries[i].Points) + continue + } + + for j := range tc.expectedPoints[i].Points { + if tc.expectedPoints[i].Points[j].Timestamp != filteredSeries[i].Points[j].Timestamp { + t.Errorf("Series %d Point %d: Expected timestamp %d, got %d", i, j, tc.expectedPoints[i].Points[j].Timestamp, filteredSeries[i].Points[j].Timestamp) + } + if tc.expectedPoints[i].Points[j].Value != filteredSeries[i].Points[j].Value { + t.Errorf("Series %d Point %d: Expected value %f, got %f", i, j, tc.expectedPoints[i].Points[j].Value, filteredSeries[i].Points[j].Value) + } + } + } + + if tc.expectedStart != startTime { + t.Errorf("Expected start time %d, got %d", tc.expectedStart, startTime) + } + if tc.expectedEnd != endTime { + t.Errorf("Expected end time %d, got %d", tc.expectedEnd, endTime) + } + }) + } +} + +func TestGetSeriesFromCachedData(t *testing.T) { + testCases := []struct { + name string + data []querycache.CachedSeriesData + start int64 + end int64 + expectedCount int + expectedPoints int + }{ + { + name: "Single point outside range", + data: []querycache.CachedSeriesData{ + { + Data: []*v3.Series{ + { + Labels: map[string]string{"label1": "value1"}, + Points: []v3.Point{ + {Timestamp: 1609473600000, Value: 1.0}, + }, + }, + }, + }, + }, + start: 1609475400000, // 01 Jan 2021 04:30:00 UTC + end: 1609477200000, // 01 Jan 2021 05:00:00 UTC + expectedCount: 1, + expectedPoints: 0, + }, + { + name: "Single point inside range", + data: []querycache.CachedSeriesData{ + { + Data: []*v3.Series{ + { + Labels: map[string]string{"label1": "value1"}, + Points: []v3.Point{ + {Timestamp: 1609476000000, Value: 1.0}, + }, + }, + }, + }, + }, + start: 1609475400000, // 01 Jan 2021 04:30:00 UTC + end: 1609477200000, // 01 Jan 2021 05:00:00 UTC + expectedCount: 1, + expectedPoints: 1, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + series := GetSeriesFromCachedData(tc.data, tc.start, tc.end) + + if len(series) != tc.expectedCount { + t.Errorf("Expected %d series, got %d", tc.expectedCount, len(series)) + } + if len(series[0].Points) != tc.expectedPoints { + t.Errorf("Expected %d points, got %d", tc.expectedPoints, len(series[0].Points)) + } + }) + } +} + +func TestGetSeriesFromCachedDataV2(t *testing.T) { + testCases := []struct { + name string + data []querycache.CachedSeriesData + start int64 + end int64 + step int64 + expectedCount int + expectedPoints int + }{ + { + name: "Single point outside range", + data: []querycache.CachedSeriesData{ + { + Data: []*v3.Series{ + { + Labels: map[string]string{"label1": "value1"}, + Points: []v3.Point{ + {Timestamp: 1609473600000, Value: 1.0}, + }, + }, + }, + }, + }, + start: 1609475400000, + end: 1609477200000, + step: 1000, + expectedCount: 1, + expectedPoints: 0, + }, + { + name: "Single point inside range", + data: []querycache.CachedSeriesData{ + { + Data: []*v3.Series{ + { + Labels: map[string]string{"label1": "value1"}, + Points: []v3.Point{ + {Timestamp: 1609476000000, Value: 1.0}, + }, + }, + }, + }, + }, + start: 1609475400000, + end: 1609477200000, + step: 1000, + expectedCount: 1, + expectedPoints: 1, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + series := GetSeriesFromCachedDataV2(tc.data, tc.start, tc.end, tc.step) + + if len(series) != tc.expectedCount { + t.Errorf("Expected %d series, got %d", tc.expectedCount, len(series)) + } + if len(series[0].Points) != tc.expectedPoints { + t.Errorf("Expected %d points, got %d", tc.expectedPoints, len(series[0].Points)) + } + }) + } +} diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 9457a4cb5d..7b360ea19e 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -149,4 +149,6 @@ type QueryCache interface { FindMissingTimeRanges(start, end int64, step int64, cacheKey string) []querycache.MissInterval FindMissingTimeRangesV2(start, end int64, step int64, cacheKey string) []querycache.MissInterval MergeWithCachedSeriesData(cacheKey string, newData []querycache.CachedSeriesData) []querycache.CachedSeriesData + StoreSeriesInCache(cacheKey string, series []querycache.CachedSeriesData) + MergeWithCachedSeriesDataV2(cacheKey string, series []querycache.CachedSeriesData) []querycache.CachedSeriesData } diff --git a/pkg/query-service/querycache/query_range_cache.go b/pkg/query-service/querycache/query_range_cache.go index 7d636c037d..4394b1936d 100644 --- a/pkg/query-service/querycache/query_range_cache.go +++ b/pkg/query-service/querycache/query_range_cache.go @@ -264,6 +264,9 @@ func (q *queryCache) mergeSeries(cachedSeries, missedSeries []*v3.Series) []*v3. } func (q *queryCache) storeMergedData(cacheKey string, mergedData []CachedSeriesData) { + if q.cache == nil { + return + } mergedDataJSON, err := json.Marshal(mergedData) if err != nil { zap.L().Error("error marshalling merged data", zap.Error(err)) @@ -275,8 +278,7 @@ func (q *queryCache) storeMergedData(cacheKey string, mergedData []CachedSeriesD } } -func (q *queryCache) MergeWithCachedSeriesData(cacheKey string, newData []CachedSeriesData) []CachedSeriesData { - +func (q *queryCache) MergeWithCachedSeriesDataV2(cacheKey string, newData []CachedSeriesData) []CachedSeriesData { if q.cache == nil { return newData } @@ -284,8 +286,7 @@ func (q *queryCache) MergeWithCachedSeriesData(cacheKey string, newData []Cached cachedData, _, _ := q.cache.Retrieve(cacheKey, true) var existingData []CachedSeriesData if err := json.Unmarshal(cachedData, &existingData); err != nil { - // In case of error, we return the entire range as a miss - q.storeMergedData(cacheKey, newData) + zap.L().Error("error unmarshalling existing data", zap.Error(err)) return newData } @@ -330,7 +331,16 @@ func (q *queryCache) MergeWithCachedSeriesData(cacheKey string, newData []Cached mergedData = append(mergedData, *current) } - q.storeMergedData(cacheKey, mergedData) - return mergedData } + +func (q *queryCache) MergeWithCachedSeriesData(cacheKey string, newData []CachedSeriesData) []CachedSeriesData { + + mergedData := q.MergeWithCachedSeriesDataV2(cacheKey, newData) + q.storeMergedData(cacheKey, mergedData) + return mergedData +} + +func (q *queryCache) StoreSeriesInCache(cacheKey string, series []CachedSeriesData) { + q.storeMergedData(cacheKey, series) +}