fix: new implementation for finding missing timerange (#7201)

* fix: new implementation for finding missing timerange

* fix: remove unwanted code

* fix: update if condition

* fix: update logic and the test cases

* fix: correct name

* fix: fix overlapping test case

* fix: add comments

* Update pkg/query-service/querycache/query_range_cache.go

Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>

* fix: use step ms

* fix: add one more test case

* fix: test case

* fix: merge missing ranger

* Update pkg/query-service/querycache/query_range_cache.go

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>

---------

Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>
Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
This commit is contained in:
Nityananda Gohain 2025-03-05 20:26:37 +05:30 committed by GitHub
parent 02865cf49e
commit b1e3f03bb5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 445 additions and 0 deletions

View File

@ -147,5 +147,6 @@ type Querier interface {
type QueryCache interface {
FindMissingTimeRanges(start, end int64, step int64, cacheKey string) []querycache.MissInterval
FindMissingTimeRangesV2(start, end int64, step int64, cacheKey string) []querycache.MissInterval
MergeWithCachedSeriesData(cacheKey string, newData []querycache.CachedSeriesData) []querycache.CachedSeriesData
}

View File

@ -49,6 +49,108 @@ func WithFluxInterval(fluxInterval time.Duration) QueryCacheOption {
}
}
// FindMissingTimeRangesV2 is a new correct implementation of FindMissingTimeRanges
// It takes care of any timestamps that were not queried due to rounding in the first version.
func (q *queryCache) FindMissingTimeRangesV2(start, end int64, step int64, cacheKey string) []MissInterval {
if q.cache == nil || cacheKey == "" {
return []MissInterval{{Start: start, End: end}}
}
stepMs := step * 1000
// when the window is too small to be cached, we return the entire range as a miss
if (start + stepMs) > end {
return []MissInterval{{Start: start, End: end}}
}
cachedSeriesDataList := q.getCachedSeriesData(cacheKey)
// Sort the cached data by start time
sort.Slice(cachedSeriesDataList, func(i, j int) bool {
return cachedSeriesDataList[i].Start < cachedSeriesDataList[j].Start
})
zap.L().Info("Number of non-overlapping cached series data", zap.Int("count", len(cachedSeriesDataList)))
// Exclude the flux interval from the cached end time
// Why do we use `time.Now()` here?
// When querying for a range [start, now())
// we don't want to use the cached data inside the flux interval period
// because the data in the flux interval period might not be fully ingested
// and should not be used for caching.
// This is not an issue if the end time is before now() - fluxInterval
if len(cachedSeriesDataList) > 0 {
lastCachedData := cachedSeriesDataList[len(cachedSeriesDataList)-1]
lastCachedData.End = int64(
math.Min(
float64(lastCachedData.End),
float64(time.Now().UnixMilli()-q.fluxInterval.Milliseconds()),
),
)
}
var missingRanges []MissInterval
currentTime := start
// check if start is a complete aggregation window if not then add it as a miss
if start%stepMs != 0 {
nextAggStart := start - (start % stepMs) + stepMs
missingRanges = append(missingRanges, MissInterval{Start: start, End: nextAggStart})
currentTime = nextAggStart
}
for _, data := range cachedSeriesDataList {
// Ignore cached data that ends before the start time
if data.End <= start {
continue
}
// Stop processing if we've reached the end time
if data.Start >= end {
break
}
// Add missing range if there's a gap
if currentTime < data.Start {
missingRanges = append(missingRanges, MissInterval{Start: currentTime, End: min(data.Start, end)})
}
// Update currentTime, but don't go past the end time
currentTime = max(currentTime, min(data.End, end))
}
// while iterating through the cachedSeriesDataList, we might have reached the end
// but there might be a case where the last data range is not a complete aggregation window
// so we add it manually by first checking if currentTime < end which means it has not reached the end
// and then checking if end%(step*1000) != 0 which means it is not a complete aggregation window but currentTime becomes end.
// that can happen when currentTime = nextAggStart and no other range match is found in the loop.
// The test case "start lies near the start of aggregation interval and end lies near the end of another aggregation interval"
// shows this case.
if currentTime < end {
missingRanges = append(missingRanges, MissInterval{Start: currentTime, End: end})
} else if end%stepMs != 0 {
// check if end is a complete aggregation window if not then add it as a miss
prevAggEnd := end - (end % stepMs)
missingRanges = append(missingRanges, MissInterval{Start: prevAggEnd, End: end})
}
// Merge overlapping or adjacent missing ranges
if len(missingRanges) <= 1 {
return missingRanges
}
merged := []MissInterval{missingRanges[0]}
for _, curr := range missingRanges[1:] {
last := &merged[len(merged)-1]
if last.End >= curr.Start {
last.End = max(last.End, curr.End)
} else {
merged = append(merged, curr)
}
}
return merged
}
func (q *queryCache) FindMissingTimeRanges(start, end, step int64, cacheKey string) []MissInterval {
if q.cache == nil || cacheKey == "" {
return []MissInterval{{Start: start, End: end}}

View File

@ -232,6 +232,348 @@ func TestFindMissingTimeRanges(t *testing.T) {
}
}
func TestFindMissingTimeRangesV2(t *testing.T) {
// Initialize the mock cache
mockCache := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute})
// Create a queryCache instance with the mock cache and a fluxInterval
q := querycache.NewQueryCache(
querycache.WithCache(mockCache),
querycache.WithFluxInterval(0), // Set to zero for testing purposes
)
// Define the test cases
testCases := []struct {
name string
requestedStart int64 // in milliseconds
requestedEnd int64 // in milliseconds
step int64 // in seconds
cacheKey string
cachedData []querycache.CachedSeriesData
expectedMiss []querycache.MissInterval
}{
{
name: "Cached time range is a subset of the requested time range",
requestedStart: 1738404000000, // 01 Feb 2025 10:00:00
requestedEnd: 1738836000000, // 06 Feb 2025 10:00:00
step: 60,
cacheKey: "testKey1",
cachedData: []querycache.CachedSeriesData{
{
Start: 1738576800000, // 03 Feb 2025 10:00:00
End: 1738749600000, // 05 Feb 2025 10:00:00
Data: []*v3.Series{}, // Data can be empty for this test
},
},
expectedMiss: []querycache.MissInterval{
{Start: 1738404000000, End: 1738576800000}, // 01 Feb 2025 10:00:00 - 03 Feb 2025 10:00:00
{Start: 1738749600000, End: 1738836000000}, // 05 Feb 2025 10:00:00 - 06 Feb 2025 10:00:00
},
},
{
name: "Cached time range is a superset of the requested time range",
requestedStart: 1738576800000, // 03 Feb 2025 10:00:00
requestedEnd: 1738749600000, // 05 Feb 2025 10:00:00
step: 60,
cacheKey: "testKey2",
cachedData: []querycache.CachedSeriesData{
{
Start: 1738404000000, // 01 Feb 2025 10:00:00
End: 1738836000000, // 06 Feb 2025 10:00:00
Data: []*v3.Series{},
},
},
expectedMiss: nil,
},
{
name: "Cached time range is a left overlap of the requested time range",
requestedStart: 1738576800000, // 03 Feb 2025 10:00:00
requestedEnd: 1738836000000, // 06 Feb 2025 10:00:00
step: 60,
cacheKey: "testKey3",
cachedData: []querycache.CachedSeriesData{
{
Start: 1738404000000, // 01 Feb 2025 10:00:00
End: 1738663200000, // 04 Feb 2025 10:00:00
Data: []*v3.Series{},
},
},
expectedMiss: []querycache.MissInterval{
{Start: 1738663200000, End: 1738836000000}, // 04 Feb 2025 10:00:00 - 06 Feb 2025 10:00:00
},
},
{
name: "Cached time range is a right overlap of the requested time range",
requestedStart: 1738404000000, // 01 Feb 2025 10:00:00
requestedEnd: 1738576800000, // 03 Feb 2025 10:00:00
step: 60,
cacheKey: "testKey4",
cachedData: []querycache.CachedSeriesData{
{
Start: 1738490400000, // 02 Feb 2025 10:00:00
End: 1738663200000, // 04 Feb 2025 10:00:00
Data: []*v3.Series{},
},
},
expectedMiss: []querycache.MissInterval{
{Start: 1738404000000, End: 1738490400000}, // 01 Feb 2025 10:00:00 - 02 Feb 2025 10:00:00
},
},
{
name: "Cached time range is disjoint from the requested time range",
requestedStart: 1738404000000, // 01 Feb 2025 10:00:00
requestedEnd: 1738576800000, // 03 Feb 2025 10:00:00
step: 60,
cacheKey: "testKey5",
cachedData: []querycache.CachedSeriesData{
{
Start: 1738836000000, // 06 Feb 2025 10:00:00
End: 1739008800000, // 08 Feb 2025 10:00:00
Data: []*v3.Series{},
},
},
expectedMiss: []querycache.MissInterval{
{Start: 1738404000000, End: 1738576800000}, // 01 Feb 2025 10:00:00 - 03 Feb 2025 10:00:00
},
},
// Additional test cases for non-overlapping cached data
{
name: "Multiple non-overlapping cached intervals within requested range",
requestedStart: 1738404000000, // 01 Feb 2025 10:00:00
requestedEnd: 1738836000000, // 06 Feb 2025 10:00:00
step: 60,
cacheKey: "testKey6",
cachedData: []querycache.CachedSeriesData{
{
Start: 1738490400000, // 02 Feb 2025 10:00:00
End: 1738576800000, // 03 Feb 2025 10:00:00
Data: []*v3.Series{},
},
{
Start: 1738663200000, // 04 Feb 2025 10:00:00
End: 1738749600000, // 05 Feb 2025 10:00:00
Data: []*v3.Series{},
},
{
Start: 1738836000000, // 06 Feb 2025 10:00:00
End: 1738922400000, // 07 Feb 2025 10:00:00
Data: []*v3.Series{},
},
},
expectedMiss: []querycache.MissInterval{
{Start: 1738404000000, End: 1738490400000}, // 01 Feb 2025 10:00:00 - 02 Feb 2025 10:00:00
{Start: 1738576800000, End: 1738663200000}, // 03 Feb 2025 10:00:00 - 04 Feb 2025 10:00:00
{Start: 1738749600000, End: 1738836000000}, // 05 Feb 2025 10:00:00 - 06 Feb 2025 10:00:00
},
},
{
name: "Cached intervals covering some parts with gaps",
requestedStart: 1738404000000, // 01 Feb 2025 10:00:00
requestedEnd: 1738490400000, // 02 Feb 2025 10:00:00
step: 60,
cacheKey: "testKey7",
cachedData: []querycache.CachedSeriesData{
{Start: 1738404000000, End: 1738418400000, Data: []*v3.Series{}}, // 01 Feb 2025 10:00:00 - 14:00:00
{Start: 1738425600000, End: 1738432800000, Data: []*v3.Series{}}, // 01 Feb 2025 16:00:00 - 18:00:00
{Start: 1738440000000, End: 1738447200000, Data: []*v3.Series{}}, // 01 Feb 2025 20:00:00 - 22:00:00
{Start: 1738454400000, End: 1738461600000, Data: []*v3.Series{}}, // 02 Feb 2025 00:00:00 - 02:00:00
},
expectedMiss: []querycache.MissInterval{
// {Start: 1738404000000, End: 1738404060000}, // 01 Feb 2025 10:00:00 - 10:01:00
{Start: 1738418400000, End: 1738425600000}, // 01 Feb 2025 14:00:00 - 16:00:00
{Start: 1738432800000, End: 1738440000000}, // 01 Feb 2025 18:00:00 - 20:00:00
{Start: 1738447200000, End: 1738454400000}, // 01 Feb 2025 22:00:00 - 02 Feb 2025 00:00:00
{Start: 1738461600000, End: 1738490400000}, // 02 Feb 2025 02:00:00 - 10:00:00
},
},
{
name: "Non-overlapping cached intervals outside requested range",
requestedStart: 1738490400000, // 02 Feb 2025 10:00:00
requestedEnd: 1738576800000, // 03 Feb 2025 10:00:00
step: 60,
cacheKey: "testKey8",
cachedData: []querycache.CachedSeriesData{
{Start: 1738404000000, End: 1738447200000, Data: []*v3.Series{}}, // 01 Feb 2025 10:00:00 - 22:00:00
{Start: 1738620000000, End: 1738663200000, Data: []*v3.Series{}}, // 03 Feb 2025 22:00:00 - 04 Feb 2025 10:00:00
},
expectedMiss: []querycache.MissInterval{
{Start: 1738490400000, End: 1738576800000}, // 02 Feb 2025 10:00:00 - 03 Feb 2025 10:00:00
},
},
{
name: "No cached data at all",
requestedStart: 1738404000000, // 01 Feb 2025 10:00:00
requestedEnd: 1738490400000, // 02 Feb 2025 10:00:00
step: 60,
cacheKey: "testKey10",
cachedData: nil,
expectedMiss: []querycache.MissInterval{
{Start: 1738404000000, End: 1738490400000}, // 01 Feb 2025 10:00:00 - 02 Feb 2025 10:00:00
},
},
{
name: "Cached intervals with overlapping and non-overlapping mix",
requestedStart: 1738404000000, // 01 Feb 2025 10:00:00
requestedEnd: 1738407600000, // 01 Feb 2025 11:00:00
step: 60,
cacheKey: "testKey11",
cachedData: []querycache.CachedSeriesData{
{Start: 1738404000000, End: 1738405200000, Data: []*v3.Series{}}, // 01 Feb 2025 10:00:00 - 10:20:00
{Start: 1738404600000, End: 1738405200000, Data: []*v3.Series{}}, // 01 Feb 2025 10:10:00 - 10:20:00
{Start: 1738406100000, End: 1738406700000, Data: []*v3.Series{}}, // 01 Feb 2025 10:35:00 - 10:45:00
{Start: 1738407000000, End: 1738407300000, Data: []*v3.Series{}}, // 01 Feb 2025 10:50:00 - 10:55:00
},
expectedMiss: []querycache.MissInterval{
{Start: 1738405200000, End: 1738406100000}, // 01 Feb 2025 10:20:00 - 10:35:00
{Start: 1738406700000, End: 1738407000000}, // 01 Feb 2025 10:45:00 - 10:50:00
{Start: 1738407300000, End: 1738407600000}, // 01 Feb 2025 10:55:00 - 11:00:00
},
},
{
name: "Cached intervals covering the edges but missing middle",
requestedStart: 1738404000000, // 01 Feb 2025 10:00:00
requestedEnd: 1738407600000, // 01 Feb 2025 11:00:00
step: 60,
cacheKey: "testKey12",
cachedData: []querycache.CachedSeriesData{
{Start: 1738404000000, End: 1738405200000, Data: []*v3.Series{}}, // 01 Feb 2025 10:00:00 - 10:20:00
{Start: 1738406400000, End: 1738407600000, Data: []*v3.Series{}}, // 01 Feb 2025 10:40:00 - 11:00:00
},
expectedMiss: []querycache.MissInterval{
{Start: 1738405200000, End: 1738406400000}, // 01 Feb 2025 10:20:00 - 10:40:00
},
},
{
name: "requested data is not one step/window",
requestedStart: 1738576800000,
requestedEnd: 1738576800001,
step: 60,
cacheKey: "testKey13",
cachedData: []querycache.CachedSeriesData{
{Start: 1738576800000, End: 1738576860000, Data: []*v3.Series{}},
},
expectedMiss: []querycache.MissInterval{{Start: 1738576800000, End: 1738576800001}},
},
{
name: "requested data is exactly one step or aggregation window",
requestedStart: 1738576800000,
requestedEnd: 1738576860000,
step: 60,
cacheKey: "testKey13",
cachedData: []querycache.CachedSeriesData{
{Start: 1738576800000, End: 1738576860000, Data: []*v3.Series{}},
},
expectedMiss: nil,
},
{
name: "start is between a cache aggregate interval and end outside of cache aggregate interval",
requestedStart: 1738576800000, // 03 Feb 2025 10:00:00
requestedEnd: 1738749600000, // 05 Feb 2025 10:00:00
step: 86400, // 24 hours
cacheKey: "testKey13",
cachedData: []querycache.CachedSeriesData{
{
Start: 1738540800000, // 03 Feb 2025 00:00:00
End: 1738713600000, // 05 Feb 2025 00:00:00
Data: []*v3.Series{},
},
},
expectedMiss: []querycache.MissInterval{
{Start: 1738576800000, End: 1738627200000}, // 03 Feb 2025 10:00:00 - 04 Feb 2025 00:00:00
{Start: 1738713600000, End: 1738749600000}, // 05 Feb 2025 00:00:00 - 05 Feb 2025 10:00:00
},
},
{
name: "start is the start of aggregate interval and end is between two aggregate intervals",
requestedStart: 1738540800000, // 03 Feb 2025 00:00:00
requestedEnd: 1738749600000, // 05 Feb 2025 10:00:00
step: 86400, // 24 hours
cacheKey: "testKey13",
cachedData: []querycache.CachedSeriesData{
{
Start: 1738540800000, // 03 Feb 2025 00:00:00
End: 1738713600000, // 05 Feb 2025 00:00:00
Data: []*v3.Series{},
},
},
expectedMiss: []querycache.MissInterval{
{Start: 1738713600000, End: 1738749600000}, // 05 Feb 2025 00:00:00 - 05 Feb 2025 10:00:00
},
},
{
name: "1. start lies near the start of aggregation interval and end lies near the end of another aggregation interval",
requestedStart: 1738541400000, // 03 Feb 2025 00:10:00
requestedEnd: 1738713000000, // 04 Feb 2025 11:50:00
step: 86400, // 24 hours
cacheKey: "testKey13",
cachedData: []querycache.CachedSeriesData{
{
Start: 1738540800000, // 03 Feb 2025 00:00:00
End: 1738713600000, // 05 Feb 2025 00:00:00
Data: []*v3.Series{},
},
},
expectedMiss: []querycache.MissInterval{
{Start: 1738541400000, End: 1738713000000}, // 03 Feb 2025 00:10:00 - 04 Feb 2025 11:50:00
},
},
{
name: "2. start lies near the start of aggregation interval and end lies near the end of another aggregation interval",
requestedStart: 1738411859000, // 01 Feb 2025 00:10:00
requestedEnd: 1738713000000, // 04 Feb 2025 11:50:00
step: 86400, // 24 hours
cacheKey: "testKey13",
cachedData: []querycache.CachedSeriesData{
{
Start: 1738540800000, // 03 Feb 2025 00:00:00
End: 1738713600000, // 05 Feb 2025 00:00:00
Data: []*v3.Series{},
},
},
expectedMiss: []querycache.MissInterval{
{Start: 1738411859000, End: 1738540800000}, // 01 Feb 2025 00:10:00 - 03 Feb 2025 00:00:00
{Start: 1738627200000, End: 1738713000000}, // 04 Feb 2025 00:00:00 - 04 Feb 2025 11:50:00
},
},
{
name: "start is before cache and end lies at the end of cache aggregation interval",
requestedStart: 1738498255000, // 02 Feb 2025 12:10:00
requestedEnd: 1738713600000, // 05 Feb 2025 00:00:00
step: 86400, // 24 hours
cacheKey: "testKey13",
cachedData: []querycache.CachedSeriesData{
{
Start: 1738540800000, // 03 Feb 2025 00:00:00
End: 1738713600000, // 05 Feb 2025 00:00:00
Data: []*v3.Series{},
},
},
expectedMiss: []querycache.MissInterval{
{Start: 1738498255000, End: 1738540800000}, // 03 Feb 2025 00:10:00 - 03 Feb 2025 00:00:00
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Store the cached data in the mock cache
if len(tc.cachedData) > 0 {
cachedDataJSON, err := json.Marshal(tc.cachedData)
assert.NoError(t, err)
err = mockCache.Store(tc.cacheKey, cachedDataJSON, 0)
assert.NoError(t, err)
}
// Call FindMissingTimeRanges
missingRanges := q.FindMissingTimeRangesV2(tc.requestedStart, tc.requestedEnd, tc.step, tc.cacheKey)
// Verify the missing ranges
assert.Equal(t, tc.expectedMiss, missingRanges)
})
}
}
func TestMergeWithCachedSeriesData(t *testing.T) {
// Initialize the mock cache
mockCache := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute})