diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 04985390d7..9457a4cb5d 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -147,5 +147,6 @@ type Querier interface { type QueryCache interface { FindMissingTimeRanges(start, end int64, step int64, cacheKey string) []querycache.MissInterval + FindMissingTimeRangesV2(start, end int64, step int64, cacheKey string) []querycache.MissInterval MergeWithCachedSeriesData(cacheKey string, newData []querycache.CachedSeriesData) []querycache.CachedSeriesData } diff --git a/pkg/query-service/querycache/query_range_cache.go b/pkg/query-service/querycache/query_range_cache.go index b2bde35eb5..7d636c037d 100644 --- a/pkg/query-service/querycache/query_range_cache.go +++ b/pkg/query-service/querycache/query_range_cache.go @@ -49,6 +49,108 @@ func WithFluxInterval(fluxInterval time.Duration) QueryCacheOption { } } +// FindMissingTimeRangesV2 is a new correct implementation of FindMissingTimeRanges +// It takes care of any timestamps that were not queried due to rounding in the first version. +func (q *queryCache) FindMissingTimeRangesV2(start, end int64, step int64, cacheKey string) []MissInterval { + if q.cache == nil || cacheKey == "" { + return []MissInterval{{Start: start, End: end}} + } + + stepMs := step * 1000 + + // when the window is too small to be cached, we return the entire range as a miss + if (start + stepMs) > end { + return []MissInterval{{Start: start, End: end}} + } + + cachedSeriesDataList := q.getCachedSeriesData(cacheKey) + + // Sort the cached data by start time + sort.Slice(cachedSeriesDataList, func(i, j int) bool { + return cachedSeriesDataList[i].Start < cachedSeriesDataList[j].Start + }) + + zap.L().Info("Number of non-overlapping cached series data", zap.Int("count", len(cachedSeriesDataList))) + + // Exclude the flux interval from the cached end time + + // Why do we use `time.Now()` here? + // When querying for a range [start, now()) + // we don't want to use the cached data inside the flux interval period + // because the data in the flux interval period might not be fully ingested + // and should not be used for caching. + // This is not an issue if the end time is before now() - fluxInterval + if len(cachedSeriesDataList) > 0 { + lastCachedData := cachedSeriesDataList[len(cachedSeriesDataList)-1] + lastCachedData.End = int64( + math.Min( + float64(lastCachedData.End), + float64(time.Now().UnixMilli()-q.fluxInterval.Milliseconds()), + ), + ) + } + + var missingRanges []MissInterval + currentTime := start + + // check if start is a complete aggregation window if not then add it as a miss + if start%stepMs != 0 { + nextAggStart := start - (start % stepMs) + stepMs + missingRanges = append(missingRanges, MissInterval{Start: start, End: nextAggStart}) + currentTime = nextAggStart + } + + for _, data := range cachedSeriesDataList { + // Ignore cached data that ends before the start time + if data.End <= start { + continue + } + // Stop processing if we've reached the end time + if data.Start >= end { + break + } + + // Add missing range if there's a gap + if currentTime < data.Start { + missingRanges = append(missingRanges, MissInterval{Start: currentTime, End: min(data.Start, end)}) + } + + // Update currentTime, but don't go past the end time + currentTime = max(currentTime, min(data.End, end)) + } + + // while iterating through the cachedSeriesDataList, we might have reached the end + // but there might be a case where the last data range is not a complete aggregation window + // so we add it manually by first checking if currentTime < end which means it has not reached the end + // and then checking if end%(step*1000) != 0 which means it is not a complete aggregation window but currentTime becomes end. + // that can happen when currentTime = nextAggStart and no other range match is found in the loop. + // The test case "start lies near the start of aggregation interval and end lies near the end of another aggregation interval" + // shows this case. + if currentTime < end { + missingRanges = append(missingRanges, MissInterval{Start: currentTime, End: end}) + } else if end%stepMs != 0 { + // check if end is a complete aggregation window if not then add it as a miss + prevAggEnd := end - (end % stepMs) + missingRanges = append(missingRanges, MissInterval{Start: prevAggEnd, End: end}) + } + + // Merge overlapping or adjacent missing ranges + if len(missingRanges) <= 1 { + return missingRanges + } + merged := []MissInterval{missingRanges[0]} + for _, curr := range missingRanges[1:] { + last := &merged[len(merged)-1] + if last.End >= curr.Start { + last.End = max(last.End, curr.End) + } else { + merged = append(merged, curr) + } + } + + return merged +} + func (q *queryCache) FindMissingTimeRanges(start, end, step int64, cacheKey string) []MissInterval { if q.cache == nil || cacheKey == "" { return []MissInterval{{Start: start, End: end}} diff --git a/pkg/query-service/querycache/query_range_cache_test.go b/pkg/query-service/querycache/query_range_cache_test.go index c71ba13f10..7f8bf75256 100644 --- a/pkg/query-service/querycache/query_range_cache_test.go +++ b/pkg/query-service/querycache/query_range_cache_test.go @@ -232,6 +232,348 @@ func TestFindMissingTimeRanges(t *testing.T) { } } +func TestFindMissingTimeRangesV2(t *testing.T) { + // Initialize the mock cache + mockCache := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute}) + + // Create a queryCache instance with the mock cache and a fluxInterval + q := querycache.NewQueryCache( + querycache.WithCache(mockCache), + querycache.WithFluxInterval(0), // Set to zero for testing purposes + ) + + // Define the test cases + testCases := []struct { + name string + requestedStart int64 // in milliseconds + requestedEnd int64 // in milliseconds + step int64 // in seconds + cacheKey string + cachedData []querycache.CachedSeriesData + expectedMiss []querycache.MissInterval + }{ + { + name: "Cached time range is a subset of the requested time range", + requestedStart: 1738404000000, // 01 Feb 2025 10:00:00 + requestedEnd: 1738836000000, // 06 Feb 2025 10:00:00 + step: 60, + cacheKey: "testKey1", + cachedData: []querycache.CachedSeriesData{ + { + Start: 1738576800000, // 03 Feb 2025 10:00:00 + End: 1738749600000, // 05 Feb 2025 10:00:00 + Data: []*v3.Series{}, // Data can be empty for this test + }, + }, + expectedMiss: []querycache.MissInterval{ + {Start: 1738404000000, End: 1738576800000}, // 01 Feb 2025 10:00:00 - 03 Feb 2025 10:00:00 + {Start: 1738749600000, End: 1738836000000}, // 05 Feb 2025 10:00:00 - 06 Feb 2025 10:00:00 + }, + }, + { + name: "Cached time range is a superset of the requested time range", + requestedStart: 1738576800000, // 03 Feb 2025 10:00:00 + requestedEnd: 1738749600000, // 05 Feb 2025 10:00:00 + step: 60, + cacheKey: "testKey2", + cachedData: []querycache.CachedSeriesData{ + { + Start: 1738404000000, // 01 Feb 2025 10:00:00 + End: 1738836000000, // 06 Feb 2025 10:00:00 + Data: []*v3.Series{}, + }, + }, + expectedMiss: nil, + }, + { + name: "Cached time range is a left overlap of the requested time range", + requestedStart: 1738576800000, // 03 Feb 2025 10:00:00 + requestedEnd: 1738836000000, // 06 Feb 2025 10:00:00 + step: 60, + cacheKey: "testKey3", + cachedData: []querycache.CachedSeriesData{ + { + Start: 1738404000000, // 01 Feb 2025 10:00:00 + End: 1738663200000, // 04 Feb 2025 10:00:00 + Data: []*v3.Series{}, + }, + }, + expectedMiss: []querycache.MissInterval{ + {Start: 1738663200000, End: 1738836000000}, // 04 Feb 2025 10:00:00 - 06 Feb 2025 10:00:00 + }, + }, + { + name: "Cached time range is a right overlap of the requested time range", + requestedStart: 1738404000000, // 01 Feb 2025 10:00:00 + requestedEnd: 1738576800000, // 03 Feb 2025 10:00:00 + step: 60, + cacheKey: "testKey4", + cachedData: []querycache.CachedSeriesData{ + { + Start: 1738490400000, // 02 Feb 2025 10:00:00 + End: 1738663200000, // 04 Feb 2025 10:00:00 + Data: []*v3.Series{}, + }, + }, + expectedMiss: []querycache.MissInterval{ + {Start: 1738404000000, End: 1738490400000}, // 01 Feb 2025 10:00:00 - 02 Feb 2025 10:00:00 + }, + }, + { + name: "Cached time range is disjoint from the requested time range", + requestedStart: 1738404000000, // 01 Feb 2025 10:00:00 + requestedEnd: 1738576800000, // 03 Feb 2025 10:00:00 + step: 60, + cacheKey: "testKey5", + cachedData: []querycache.CachedSeriesData{ + { + Start: 1738836000000, // 06 Feb 2025 10:00:00 + End: 1739008800000, // 08 Feb 2025 10:00:00 + Data: []*v3.Series{}, + }, + }, + expectedMiss: []querycache.MissInterval{ + {Start: 1738404000000, End: 1738576800000}, // 01 Feb 2025 10:00:00 - 03 Feb 2025 10:00:00 + }, + }, + // Additional test cases for non-overlapping cached data + { + name: "Multiple non-overlapping cached intervals within requested range", + requestedStart: 1738404000000, // 01 Feb 2025 10:00:00 + requestedEnd: 1738836000000, // 06 Feb 2025 10:00:00 + step: 60, + cacheKey: "testKey6", + cachedData: []querycache.CachedSeriesData{ + { + Start: 1738490400000, // 02 Feb 2025 10:00:00 + End: 1738576800000, // 03 Feb 2025 10:00:00 + Data: []*v3.Series{}, + }, + { + Start: 1738663200000, // 04 Feb 2025 10:00:00 + End: 1738749600000, // 05 Feb 2025 10:00:00 + Data: []*v3.Series{}, + }, + { + Start: 1738836000000, // 06 Feb 2025 10:00:00 + End: 1738922400000, // 07 Feb 2025 10:00:00 + Data: []*v3.Series{}, + }, + }, + expectedMiss: []querycache.MissInterval{ + {Start: 1738404000000, End: 1738490400000}, // 01 Feb 2025 10:00:00 - 02 Feb 2025 10:00:00 + {Start: 1738576800000, End: 1738663200000}, // 03 Feb 2025 10:00:00 - 04 Feb 2025 10:00:00 + {Start: 1738749600000, End: 1738836000000}, // 05 Feb 2025 10:00:00 - 06 Feb 2025 10:00:00 + }, + }, + { + name: "Cached intervals covering some parts with gaps", + requestedStart: 1738404000000, // 01 Feb 2025 10:00:00 + requestedEnd: 1738490400000, // 02 Feb 2025 10:00:00 + step: 60, + cacheKey: "testKey7", + cachedData: []querycache.CachedSeriesData{ + {Start: 1738404000000, End: 1738418400000, Data: []*v3.Series{}}, // 01 Feb 2025 10:00:00 - 14:00:00 + {Start: 1738425600000, End: 1738432800000, Data: []*v3.Series{}}, // 01 Feb 2025 16:00:00 - 18:00:00 + {Start: 1738440000000, End: 1738447200000, Data: []*v3.Series{}}, // 01 Feb 2025 20:00:00 - 22:00:00 + {Start: 1738454400000, End: 1738461600000, Data: []*v3.Series{}}, // 02 Feb 2025 00:00:00 - 02:00:00 + }, + expectedMiss: []querycache.MissInterval{ + // {Start: 1738404000000, End: 1738404060000}, // 01 Feb 2025 10:00:00 - 10:01:00 + {Start: 1738418400000, End: 1738425600000}, // 01 Feb 2025 14:00:00 - 16:00:00 + {Start: 1738432800000, End: 1738440000000}, // 01 Feb 2025 18:00:00 - 20:00:00 + {Start: 1738447200000, End: 1738454400000}, // 01 Feb 2025 22:00:00 - 02 Feb 2025 00:00:00 + {Start: 1738461600000, End: 1738490400000}, // 02 Feb 2025 02:00:00 - 10:00:00 + }, + }, + { + name: "Non-overlapping cached intervals outside requested range", + requestedStart: 1738490400000, // 02 Feb 2025 10:00:00 + requestedEnd: 1738576800000, // 03 Feb 2025 10:00:00 + step: 60, + cacheKey: "testKey8", + cachedData: []querycache.CachedSeriesData{ + {Start: 1738404000000, End: 1738447200000, Data: []*v3.Series{}}, // 01 Feb 2025 10:00:00 - 22:00:00 + {Start: 1738620000000, End: 1738663200000, Data: []*v3.Series{}}, // 03 Feb 2025 22:00:00 - 04 Feb 2025 10:00:00 + }, + expectedMiss: []querycache.MissInterval{ + {Start: 1738490400000, End: 1738576800000}, // 02 Feb 2025 10:00:00 - 03 Feb 2025 10:00:00 + }, + }, + { + name: "No cached data at all", + requestedStart: 1738404000000, // 01 Feb 2025 10:00:00 + requestedEnd: 1738490400000, // 02 Feb 2025 10:00:00 + step: 60, + cacheKey: "testKey10", + cachedData: nil, + expectedMiss: []querycache.MissInterval{ + {Start: 1738404000000, End: 1738490400000}, // 01 Feb 2025 10:00:00 - 02 Feb 2025 10:00:00 + }, + }, + { + name: "Cached intervals with overlapping and non-overlapping mix", + requestedStart: 1738404000000, // 01 Feb 2025 10:00:00 + requestedEnd: 1738407600000, // 01 Feb 2025 11:00:00 + step: 60, + cacheKey: "testKey11", + cachedData: []querycache.CachedSeriesData{ + {Start: 1738404000000, End: 1738405200000, Data: []*v3.Series{}}, // 01 Feb 2025 10:00:00 - 10:20:00 + {Start: 1738404600000, End: 1738405200000, Data: []*v3.Series{}}, // 01 Feb 2025 10:10:00 - 10:20:00 + {Start: 1738406100000, End: 1738406700000, Data: []*v3.Series{}}, // 01 Feb 2025 10:35:00 - 10:45:00 + {Start: 1738407000000, End: 1738407300000, Data: []*v3.Series{}}, // 01 Feb 2025 10:50:00 - 10:55:00 + }, + expectedMiss: []querycache.MissInterval{ + {Start: 1738405200000, End: 1738406100000}, // 01 Feb 2025 10:20:00 - 10:35:00 + {Start: 1738406700000, End: 1738407000000}, // 01 Feb 2025 10:45:00 - 10:50:00 + {Start: 1738407300000, End: 1738407600000}, // 01 Feb 2025 10:55:00 - 11:00:00 + }, + }, + { + name: "Cached intervals covering the edges but missing middle", + requestedStart: 1738404000000, // 01 Feb 2025 10:00:00 + requestedEnd: 1738407600000, // 01 Feb 2025 11:00:00 + step: 60, + cacheKey: "testKey12", + cachedData: []querycache.CachedSeriesData{ + {Start: 1738404000000, End: 1738405200000, Data: []*v3.Series{}}, // 01 Feb 2025 10:00:00 - 10:20:00 + {Start: 1738406400000, End: 1738407600000, Data: []*v3.Series{}}, // 01 Feb 2025 10:40:00 - 11:00:00 + }, + expectedMiss: []querycache.MissInterval{ + {Start: 1738405200000, End: 1738406400000}, // 01 Feb 2025 10:20:00 - 10:40:00 + }, + }, + { + name: "requested data is not one step/window", + requestedStart: 1738576800000, + requestedEnd: 1738576800001, + step: 60, + cacheKey: "testKey13", + cachedData: []querycache.CachedSeriesData{ + {Start: 1738576800000, End: 1738576860000, Data: []*v3.Series{}}, + }, + expectedMiss: []querycache.MissInterval{{Start: 1738576800000, End: 1738576800001}}, + }, + { + name: "requested data is exactly one step or aggregation window", + requestedStart: 1738576800000, + requestedEnd: 1738576860000, + step: 60, + cacheKey: "testKey13", + cachedData: []querycache.CachedSeriesData{ + {Start: 1738576800000, End: 1738576860000, Data: []*v3.Series{}}, + }, + expectedMiss: nil, + }, + { + name: "start is between a cache aggregate interval and end outside of cache aggregate interval", + requestedStart: 1738576800000, // 03 Feb 2025 10:00:00 + requestedEnd: 1738749600000, // 05 Feb 2025 10:00:00 + step: 86400, // 24 hours + cacheKey: "testKey13", + cachedData: []querycache.CachedSeriesData{ + { + Start: 1738540800000, // 03 Feb 2025 00:00:00 + End: 1738713600000, // 05 Feb 2025 00:00:00 + Data: []*v3.Series{}, + }, + }, + expectedMiss: []querycache.MissInterval{ + {Start: 1738576800000, End: 1738627200000}, // 03 Feb 2025 10:00:00 - 04 Feb 2025 00:00:00 + {Start: 1738713600000, End: 1738749600000}, // 05 Feb 2025 00:00:00 - 05 Feb 2025 10:00:00 + }, + }, + { + name: "start is the start of aggregate interval and end is between two aggregate intervals", + requestedStart: 1738540800000, // 03 Feb 2025 00:00:00 + requestedEnd: 1738749600000, // 05 Feb 2025 10:00:00 + step: 86400, // 24 hours + cacheKey: "testKey13", + cachedData: []querycache.CachedSeriesData{ + { + Start: 1738540800000, // 03 Feb 2025 00:00:00 + End: 1738713600000, // 05 Feb 2025 00:00:00 + Data: []*v3.Series{}, + }, + }, + expectedMiss: []querycache.MissInterval{ + {Start: 1738713600000, End: 1738749600000}, // 05 Feb 2025 00:00:00 - 05 Feb 2025 10:00:00 + }, + }, + { + name: "1. start lies near the start of aggregation interval and end lies near the end of another aggregation interval", + requestedStart: 1738541400000, // 03 Feb 2025 00:10:00 + requestedEnd: 1738713000000, // 04 Feb 2025 11:50:00 + step: 86400, // 24 hours + cacheKey: "testKey13", + cachedData: []querycache.CachedSeriesData{ + { + Start: 1738540800000, // 03 Feb 2025 00:00:00 + End: 1738713600000, // 05 Feb 2025 00:00:00 + Data: []*v3.Series{}, + }, + }, + expectedMiss: []querycache.MissInterval{ + {Start: 1738541400000, End: 1738713000000}, // 03 Feb 2025 00:10:00 - 04 Feb 2025 11:50:00 + }, + }, + { + name: "2. start lies near the start of aggregation interval and end lies near the end of another aggregation interval", + requestedStart: 1738411859000, // 01 Feb 2025 00:10:00 + requestedEnd: 1738713000000, // 04 Feb 2025 11:50:00 + step: 86400, // 24 hours + cacheKey: "testKey13", + cachedData: []querycache.CachedSeriesData{ + { + Start: 1738540800000, // 03 Feb 2025 00:00:00 + End: 1738713600000, // 05 Feb 2025 00:00:00 + Data: []*v3.Series{}, + }, + }, + expectedMiss: []querycache.MissInterval{ + {Start: 1738411859000, End: 1738540800000}, // 01 Feb 2025 00:10:00 - 03 Feb 2025 00:00:00 + {Start: 1738627200000, End: 1738713000000}, // 04 Feb 2025 00:00:00 - 04 Feb 2025 11:50:00 + }, + }, + { + name: "start is before cache and end lies at the end of cache aggregation interval", + requestedStart: 1738498255000, // 02 Feb 2025 12:10:00 + requestedEnd: 1738713600000, // 05 Feb 2025 00:00:00 + step: 86400, // 24 hours + cacheKey: "testKey13", + cachedData: []querycache.CachedSeriesData{ + { + Start: 1738540800000, // 03 Feb 2025 00:00:00 + End: 1738713600000, // 05 Feb 2025 00:00:00 + Data: []*v3.Series{}, + }, + }, + expectedMiss: []querycache.MissInterval{ + {Start: 1738498255000, End: 1738540800000}, // 03 Feb 2025 00:10:00 - 03 Feb 2025 00:00:00 + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + + // Store the cached data in the mock cache + if len(tc.cachedData) > 0 { + cachedDataJSON, err := json.Marshal(tc.cachedData) + assert.NoError(t, err) + err = mockCache.Store(tc.cacheKey, cachedDataJSON, 0) + assert.NoError(t, err) + } + + // Call FindMissingTimeRanges + missingRanges := q.FindMissingTimeRangesV2(tc.requestedStart, tc.requestedEnd, tc.step, tc.cacheKey) + + // Verify the missing ranges + assert.Equal(t, tc.expectedMiss, missingRanges) + }) + } +} + func TestMergeWithCachedSeriesData(t *testing.T) { // Initialize the mock cache mockCache := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute})