mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-15 01:15:54 +08:00
Store complete intervals in cache and update logic for response (#7212)
* fix: new implementation for finding missing timerange * fix: remove unwanted code * fix: update if condition * fix: update logic and the test cases * fix: correct name * fix: filter points which are not a complete agg interval * fix: fix the logic to use the points correctly * fix: fix overlapping test case * fix: add comments * Update pkg/query-service/querycache/query_range_cache.go Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> * fix: use step ms * fix: use step ms * fix: tests * fix: update logic to handle actual empty series * fix: name updated * Update pkg/query-service/app/querier/v2/helper.go Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> * fix: address comments * fix: address comments * fix: address comments * Update pkg/query-service/common/query_range.go Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> * fix: add error log * fix: handle case where end is equal to a complete window end * fix: added comments * fix: address comments * fix: move function to common query range --------- Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>
This commit is contained in:
parent
946a249c85
commit
9a3c49bce4
@ -122,6 +122,7 @@ func (q *querier) runBuilderQuery(
|
|||||||
misses := q.queryCache.FindMissingTimeRanges(start, end, builderQuery.StepInterval, cacheKeys[queryName])
|
misses := q.queryCache.FindMissingTimeRanges(start, end, builderQuery.StepInterval, cacheKeys[queryName])
|
||||||
zap.L().Info("cache misses for logs query", zap.Any("misses", misses))
|
zap.L().Info("cache misses for logs query", zap.Any("misses", misses))
|
||||||
missedSeries := make([]querycache.CachedSeriesData, 0)
|
missedSeries := make([]querycache.CachedSeriesData, 0)
|
||||||
|
filteredMissedSeries := make([]querycache.CachedSeriesData, 0)
|
||||||
for _, miss := range misses {
|
for _, miss := range misses {
|
||||||
query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, miss.Start, miss.End, builderQuery, params, preferRPM)
|
query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, miss.Start, miss.End, builderQuery, params, preferRPM)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -138,15 +139,32 @@ func (q *querier) runBuilderQuery(
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
missedSeries = append(missedSeries, querycache.CachedSeriesData{
|
filteredSeries, startTime, endTime := common.FilterSeriesPoints(series, miss.Start, miss.End, builderQuery.StepInterval)
|
||||||
Start: miss.Start,
|
|
||||||
End: miss.End,
|
// making sure that empty range doesn't doesn't enter the cache
|
||||||
Data: series,
|
// empty results from filteredSeries means data was filtered out, but empty series means actual empty data
|
||||||
|
if len(filteredSeries) > 0 || len(series) == 0 {
|
||||||
|
filteredMissedSeries = append(filteredMissedSeries, querycache.CachedSeriesData{
|
||||||
|
Data: filteredSeries,
|
||||||
|
Start: startTime,
|
||||||
|
End: endTime,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKeys[queryName], missedSeries)
|
|
||||||
|
|
||||||
resultSeries := common.GetSeriesFromCachedData(mergedSeries, start, end)
|
// for the actual response
|
||||||
|
missedSeries = append(missedSeries, querycache.CachedSeriesData{
|
||||||
|
Data: series,
|
||||||
|
Start: miss.Start,
|
||||||
|
End: miss.End,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
filteredMergedSeries := q.queryCache.MergeWithCachedSeriesDataV2(cacheKeys[queryName], filteredMissedSeries)
|
||||||
|
q.queryCache.StoreSeriesInCache(cacheKeys[queryName], filteredMergedSeries)
|
||||||
|
|
||||||
|
mergedSeries := q.queryCache.MergeWithCachedSeriesDataV2(cacheKeys[queryName], missedSeries)
|
||||||
|
|
||||||
|
resultSeries := common.GetSeriesFromCachedDataV2(mergedSeries, start, end, builderQuery.StepInterval)
|
||||||
|
|
||||||
ch <- channelResult{
|
ch <- channelResult{
|
||||||
Err: nil,
|
Err: nil,
|
||||||
|
@ -119,9 +119,10 @@ func (q *querier) runBuilderQuery(
|
|||||||
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series}
|
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
misses := q.queryCache.FindMissingTimeRanges(start, end, builderQuery.StepInterval, cacheKeys[queryName])
|
misses := q.queryCache.FindMissingTimeRangesV2(start, end, builderQuery.StepInterval, cacheKeys[queryName])
|
||||||
zap.L().Info("cache misses for logs query", zap.Any("misses", misses))
|
zap.L().Info("cache misses for logs query", zap.Any("misses", misses))
|
||||||
missedSeries := make([]querycache.CachedSeriesData, 0)
|
missedSeries := make([]querycache.CachedSeriesData, 0)
|
||||||
|
filteredMissedSeries := make([]querycache.CachedSeriesData, 0)
|
||||||
for _, miss := range misses {
|
for _, miss := range misses {
|
||||||
query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, miss.Start, miss.End, builderQuery, params, preferRPM)
|
query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, miss.Start, miss.End, builderQuery, params, preferRPM)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -138,15 +139,33 @@ func (q *querier) runBuilderQuery(
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
filteredSeries, startTime, endTime := common.FilterSeriesPoints(series, miss.Start, miss.End, builderQuery.StepInterval)
|
||||||
|
|
||||||
|
// making sure that empty range doesn't doesn't enter the cache
|
||||||
|
// empty results from filteredSeries means data was filtered out, but empty series means actual empty data
|
||||||
|
if len(filteredSeries) > 0 || len(series) == 0 {
|
||||||
|
filteredMissedSeries = append(filteredMissedSeries, querycache.CachedSeriesData{
|
||||||
|
Data: filteredSeries,
|
||||||
|
Start: startTime,
|
||||||
|
End: endTime,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// for the actual response
|
||||||
missedSeries = append(missedSeries, querycache.CachedSeriesData{
|
missedSeries = append(missedSeries, querycache.CachedSeriesData{
|
||||||
Data: series,
|
Data: series,
|
||||||
Start: miss.Start,
|
Start: miss.Start,
|
||||||
End: miss.End,
|
End: miss.End,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKeys[queryName], missedSeries)
|
|
||||||
|
|
||||||
resultSeries := common.GetSeriesFromCachedData(mergedSeries, start, end)
|
filteredMergedSeries := q.queryCache.MergeWithCachedSeriesDataV2(cacheKeys[queryName], filteredMissedSeries)
|
||||||
|
q.queryCache.StoreSeriesInCache(cacheKeys[queryName], filteredMergedSeries)
|
||||||
|
|
||||||
|
mergedSeries := q.queryCache.MergeWithCachedSeriesDataV2(cacheKeys[queryName], missedSeries)
|
||||||
|
|
||||||
|
resultSeries := common.GetSeriesFromCachedDataV2(mergedSeries, start, end, builderQuery.StepInterval)
|
||||||
|
|
||||||
ch <- channelResult{
|
ch <- channelResult{
|
||||||
Err: nil,
|
Err: nil,
|
||||||
|
@ -3,6 +3,7 @@ package common
|
|||||||
import (
|
import (
|
||||||
"math"
|
"math"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
"unicode"
|
"unicode"
|
||||||
|
|
||||||
@ -123,3 +124,108 @@ func GetSeriesFromCachedData(data []querycache.CachedSeriesData, start, end int6
|
|||||||
}
|
}
|
||||||
return newSeries
|
return newSeries
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// It is different from GetSeriesFromCachedData because doesn't remove a point if it is >= (start - (start % step*1000))
|
||||||
|
func GetSeriesFromCachedDataV2(data []querycache.CachedSeriesData, start, end, step int64) []*v3.Series {
|
||||||
|
series := make(map[uint64]*v3.Series)
|
||||||
|
|
||||||
|
for _, cachedData := range data {
|
||||||
|
for _, data := range cachedData.Data {
|
||||||
|
h := labels.FromMap(data.Labels).Hash()
|
||||||
|
|
||||||
|
if _, ok := series[h]; !ok {
|
||||||
|
series[h] = &v3.Series{
|
||||||
|
Labels: data.Labels,
|
||||||
|
LabelsArray: data.LabelsArray,
|
||||||
|
Points: make([]v3.Point, 0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, point := range data.Points {
|
||||||
|
if point.Timestamp >= (start-(start%(step*1000))) && point.Timestamp <= end {
|
||||||
|
series[h].Points = append(series[h].Points, point)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
newSeries := make([]*v3.Series, 0, len(series))
|
||||||
|
for _, s := range series {
|
||||||
|
s.SortPoints()
|
||||||
|
s.RemoveDuplicatePoints()
|
||||||
|
newSeries = append(newSeries, s)
|
||||||
|
}
|
||||||
|
return newSeries
|
||||||
|
}
|
||||||
|
|
||||||
|
// filter series points for storing in cache
|
||||||
|
func FilterSeriesPoints(seriesList []*v3.Series, missStart, missEnd int64, stepInterval int64) ([]*v3.Series, int64, int64) {
|
||||||
|
filteredSeries := make([]*v3.Series, 0)
|
||||||
|
startTime := missStart
|
||||||
|
endTime := missEnd
|
||||||
|
|
||||||
|
stepMs := stepInterval * 1000
|
||||||
|
|
||||||
|
// return empty series if the interval is not complete
|
||||||
|
if missStart+stepMs > missEnd {
|
||||||
|
return []*v3.Series{}, missStart, missEnd
|
||||||
|
}
|
||||||
|
|
||||||
|
// if the end time is not a complete aggregation window, then we will have to adjust the end time
|
||||||
|
// to the previous complete aggregation window end
|
||||||
|
endCompleteWindow := missEnd%stepMs == 0
|
||||||
|
if !endCompleteWindow {
|
||||||
|
endTime = missEnd - (missEnd % stepMs)
|
||||||
|
}
|
||||||
|
|
||||||
|
// if the start time is not a complete aggregation window, then we will have to adjust the start time
|
||||||
|
// to the next complete aggregation window
|
||||||
|
if missStart%stepMs != 0 {
|
||||||
|
startTime = missStart + stepMs - (missStart % stepMs)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, series := range seriesList {
|
||||||
|
// if data for the series is empty, then we will add it to the cache
|
||||||
|
if len(series.Points) == 0 {
|
||||||
|
filteredSeries = append(filteredSeries, &v3.Series{
|
||||||
|
Labels: series.Labels,
|
||||||
|
LabelsArray: series.LabelsArray,
|
||||||
|
Points: make([]v3.Point, 0),
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort the points based on timestamp
|
||||||
|
sort.Slice(series.Points, func(i, j int) bool {
|
||||||
|
return series.Points[i].Timestamp < series.Points[j].Timestamp
|
||||||
|
})
|
||||||
|
|
||||||
|
points := make([]v3.Point, len(series.Points))
|
||||||
|
copy(points, series.Points)
|
||||||
|
|
||||||
|
// Filter the first point that is not a complete aggregation window
|
||||||
|
if series.Points[0].Timestamp < missStart {
|
||||||
|
// Remove the first point
|
||||||
|
points = points[1:]
|
||||||
|
}
|
||||||
|
|
||||||
|
// filter the last point if it is not a complete aggregation window
|
||||||
|
// adding or condition to handle the end time is equal to a complete window end https://github.com/SigNoz/signoz/pull/7212#issuecomment-2703677190
|
||||||
|
if (!endCompleteWindow && series.Points[len(series.Points)-1].Timestamp == missEnd-(missEnd%stepMs)) ||
|
||||||
|
(endCompleteWindow && series.Points[len(series.Points)-1].Timestamp == missEnd) {
|
||||||
|
// Remove the last point
|
||||||
|
points = points[:len(points)-1]
|
||||||
|
}
|
||||||
|
|
||||||
|
// making sure that empty range doesn't enter the cache
|
||||||
|
if len(points) > 0 {
|
||||||
|
filteredSeries = append(filteredSeries, &v3.Series{
|
||||||
|
Labels: series.Labels,
|
||||||
|
LabelsArray: series.LabelsArray,
|
||||||
|
Points: points,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return filteredSeries, startTime, endTime
|
||||||
|
}
|
||||||
|
435
pkg/query-service/common/query_range_test.go
Normal file
435
pkg/query-service/common/query_range_test.go
Normal file
@ -0,0 +1,435 @@
|
|||||||
|
package common
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/querycache"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestFilterSeriesPoints(t *testing.T) {
|
||||||
|
// Define test cases
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
seriesList []*v3.Series
|
||||||
|
missStart int64 // in milliseconds
|
||||||
|
missEnd int64 // in milliseconds
|
||||||
|
stepInterval int64 // in seconds
|
||||||
|
expectedPoints []*v3.Series
|
||||||
|
expectedStart int64 // in milliseconds
|
||||||
|
expectedEnd int64 // in milliseconds
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Complete aggregation window",
|
||||||
|
missStart: 1609459200000, // 01 Jan 2021 00:00:00 UTC
|
||||||
|
missEnd: 1609466400000, // 01 Jan 2021 02:00:00 UTC
|
||||||
|
stepInterval: 3600, // 1 hour
|
||||||
|
seriesList: []*v3.Series{
|
||||||
|
{
|
||||||
|
Points: []v3.Point{
|
||||||
|
{Timestamp: 1609459200000, Value: 1.0}, // 01 Jan 2021 00:00:00 UTC
|
||||||
|
{Timestamp: 1609462800000, Value: 2.0}, // 01 Jan 2021 01:00:00 UTC
|
||||||
|
{Timestamp: 1609466400000, Value: 3.0}, // 01 Jan 2021 02:00:00 UTC
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedPoints: []*v3.Series{
|
||||||
|
{
|
||||||
|
Points: []v3.Point{
|
||||||
|
{Timestamp: 1609459200000, Value: 1.0},
|
||||||
|
{Timestamp: 1609462800000, Value: 2.0},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedStart: 1609459200000,
|
||||||
|
expectedEnd: 1609466400000,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Filter first point",
|
||||||
|
missStart: 1609464600000, // 01 Jan 2021 01:30:00 UTC
|
||||||
|
missEnd: 1609470000000, // 01 Jan 2021 03:00:00 UTC
|
||||||
|
stepInterval: 3600, // 1 hour
|
||||||
|
seriesList: []*v3.Series{
|
||||||
|
{
|
||||||
|
Points: []v3.Point{
|
||||||
|
{Timestamp: 1609462800000, Value: 2.0}, // 01 Jan 2021 01:00:00 UTC
|
||||||
|
{Timestamp: 1609466400000, Value: 3.0}, // 01 Jan 2021 02:00:00 UTC
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedPoints: []*v3.Series{
|
||||||
|
{
|
||||||
|
Points: []v3.Point{
|
||||||
|
{Timestamp: 1609466400000, Value: 3.0},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedStart: 1609466400000,
|
||||||
|
expectedEnd: 1609470000000,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Filter last point",
|
||||||
|
missStart: 1609466400000, // 01 Jan 2021 02:00:00 UTC
|
||||||
|
missEnd: 1609471800000, // 01 Jan 2021 03:30:00 UTC
|
||||||
|
stepInterval: 3600, // 1 hour
|
||||||
|
seriesList: []*v3.Series{
|
||||||
|
{
|
||||||
|
Points: []v3.Point{
|
||||||
|
{Timestamp: 1609466400000, Value: 3.0}, // 01 Jan 2021 02:00:00 UTC
|
||||||
|
{Timestamp: 1609470000000, Value: 3.0}, // 01 Jan 2021 03:00:00 UTC
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedPoints: []*v3.Series{
|
||||||
|
{
|
||||||
|
Points: []v3.Point{
|
||||||
|
{Timestamp: 1609466400000, Value: 3.0},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedStart: 1609466400000,
|
||||||
|
expectedEnd: 1609470000000,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Incomplete aggregation window",
|
||||||
|
missStart: 1609470000000, // 01 Jan 2021 03:00:00 UTC
|
||||||
|
missEnd: 1609471800000, // 01 Jan 2021 03:30:00 UTC
|
||||||
|
stepInterval: 3600, // 1 hour
|
||||||
|
seriesList: []*v3.Series{
|
||||||
|
{
|
||||||
|
Points: []v3.Point{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedPoints: []*v3.Series{},
|
||||||
|
expectedStart: 1609470000000,
|
||||||
|
expectedEnd: 1609471800000,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Filter first point with multiple series",
|
||||||
|
missStart: 1609464600000, // 01 Jan 2021 01:30:00 UTC
|
||||||
|
missEnd: 1609477200000, // 01 Jan 2021 05:00:00 UTC
|
||||||
|
stepInterval: 3600, // 1 hour
|
||||||
|
seriesList: []*v3.Series{
|
||||||
|
{
|
||||||
|
Points: []v3.Point{
|
||||||
|
{Timestamp: 1609462800000, Value: 2.0}, // 01 Jan 2021 01:00:00 UTC
|
||||||
|
{Timestamp: 1609466400000, Value: 3.0}, // 01 Jan 2021 02:00:00 UTC
|
||||||
|
{Timestamp: 1609470000000, Value: 4.0}, // 01 Jan 2021 03:00:00 UTC
|
||||||
|
{Timestamp: 1609473600000, Value: 5.0}, // 01 Jan 2021 04:00:00 UTC
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Points: []v3.Point{
|
||||||
|
{Timestamp: 1609466400000, Value: 6.0}, // 01 Jan 2021 02:00:00 UTC
|
||||||
|
{Timestamp: 1609470000000, Value: 7.0}, // 01 Jan 2021 03:00:00 UTC
|
||||||
|
{Timestamp: 1609473600000, Value: 8.0}, // 01 Jan 2021 04:00:00 UTC
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Points: []v3.Point{
|
||||||
|
{Timestamp: 1609466400000, Value: 9.0}, // 01 Jan 2021 02:00:00 UTC
|
||||||
|
{Timestamp: 1609470000000, Value: 10.0}, // 01 Jan 2021 03:00:00 UTC
|
||||||
|
{Timestamp: 1609473600000, Value: 11.0}, // 01 Jan 2021 04:00:00 UTC
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedPoints: []*v3.Series{
|
||||||
|
{
|
||||||
|
Points: []v3.Point{
|
||||||
|
{Timestamp: 1609466400000, Value: 3.0}, // 01 Jan 2021 02:00:00 UTC
|
||||||
|
{Timestamp: 1609470000000, Value: 4.0}, // 01 Jan 2021 03:00:00 UTC
|
||||||
|
{Timestamp: 1609473600000, Value: 5.0}, // 01 Jan 2021 04:00:00 UTC
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Points: []v3.Point{
|
||||||
|
{Timestamp: 1609466400000, Value: 6.0}, // 01 Jan 2021 02:00:00 UTC
|
||||||
|
{Timestamp: 1609470000000, Value: 7.0}, // 01 Jan 2021 03:00:00 UTC
|
||||||
|
{Timestamp: 1609473600000, Value: 8.0}, // 01 Jan 2021 04:00:00 UTC
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Points: []v3.Point{
|
||||||
|
{Timestamp: 1609466400000, Value: 9.0}, // 01 Jan 2021 02:00:00 UTC
|
||||||
|
{Timestamp: 1609470000000, Value: 10.0}, // 01 Jan 2021 03:00:00 UTC
|
||||||
|
{Timestamp: 1609473600000, Value: 11.0}, // 01 Jan 2021 04:00:00 UTC
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedStart: 1609466400000,
|
||||||
|
expectedEnd: 1609477200000,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Filter last point",
|
||||||
|
missStart: 1609466400000, // 01 Jan 2021 02:00:00 UTC
|
||||||
|
missEnd: 1609475400000, // 01 Jan 2021 04:30:00 UTC
|
||||||
|
stepInterval: 3600, // 1 hour
|
||||||
|
seriesList: []*v3.Series{
|
||||||
|
{
|
||||||
|
Points: []v3.Point{
|
||||||
|
{Timestamp: 1609466400000, Value: 3.0}, // 01 Jan 2021 02:00:00 UTC
|
||||||
|
{Timestamp: 1609470000000, Value: 4.0}, // 01 Jan 2021 03:00:00 UTC
|
||||||
|
{Timestamp: 1609473600000, Value: 5.0}, // 01 Jan 2021 04:00:00 UTC
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Points: []v3.Point{
|
||||||
|
{Timestamp: 1609466400000, Value: 6.0}, // 01 Jan 2021 02:00:00 UTC
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Points: []v3.Point{
|
||||||
|
{Timestamp: 1609466400000, Value: 9.0}, // 01 Jan 2021 02:00:00 UTC
|
||||||
|
{Timestamp: 1609470000000, Value: 10.0}, // 01 Jan 2021 03:00:00 UTC
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedPoints: []*v3.Series{
|
||||||
|
{
|
||||||
|
Points: []v3.Point{
|
||||||
|
{Timestamp: 1609466400000, Value: 3.0}, // 01 Jan 2021 02:00:00 UTC
|
||||||
|
{Timestamp: 1609470000000, Value: 4.0}, // 01 Jan 2021 03:00:00 UTC
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Points: []v3.Point{
|
||||||
|
{Timestamp: 1609466400000, Value: 6.0}, // 01 Jan 2021 02:00:00 UTC
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Points: []v3.Point{
|
||||||
|
{Timestamp: 1609466400000, Value: 9.0}, // 01 Jan 2021 02:00:00 UTC
|
||||||
|
{Timestamp: 1609470000000, Value: 10.0}, // 01 Jan 2021 03:00:00 UTC
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedStart: 1609466400000,
|
||||||
|
expectedEnd: 1609473600000,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "half range should return empty result",
|
||||||
|
missStart: 1609473600000, // 01 Jan 2021 04:00:00 UTC
|
||||||
|
missEnd: 1609475400000, // 01 Jan 2021 04:30:00 UTC
|
||||||
|
stepInterval: 3600, // 1 hour
|
||||||
|
seriesList: []*v3.Series{
|
||||||
|
{
|
||||||
|
Points: []v3.Point{
|
||||||
|
{Timestamp: 1609473600000, Value: 1.0}, // 01 Jan 2021 04:00:00 UTC
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedPoints: []*v3.Series{},
|
||||||
|
expectedStart: 1609473600000,
|
||||||
|
expectedEnd: 1609475400000,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "respect actual empty series",
|
||||||
|
missStart: 1609466400000, // 01 Jan 2021 02:00:00 UTC
|
||||||
|
missEnd: 1609475400000, // 01 Jan 2021 04:30:00 UTC
|
||||||
|
stepInterval: 3600, // 1 hour
|
||||||
|
seriesList: []*v3.Series{
|
||||||
|
{
|
||||||
|
Points: []v3.Point{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedPoints: []*v3.Series{
|
||||||
|
{
|
||||||
|
Points: []v3.Point{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedStart: 1609466400000,
|
||||||
|
expectedEnd: 1609473600000,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Remove point that is not a complete aggregation window",
|
||||||
|
missStart: 1609466400000, // 01 Jan 2021 02:00:00 UTC
|
||||||
|
missEnd: 1609470000000, // 01 Jan 2021 03:00:00 UTC
|
||||||
|
stepInterval: 3600, // 1 hour
|
||||||
|
seriesList: []*v3.Series{
|
||||||
|
{
|
||||||
|
Points: []v3.Point{
|
||||||
|
{Timestamp: 1609466400000, Value: 2.0}, // 01 Jan 2021 02:00:00 UTC
|
||||||
|
{Timestamp: 1609470000000, Value: 3.0}, // 01 Jan 2021 03:00:00 UTC
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedPoints: []*v3.Series{
|
||||||
|
{
|
||||||
|
Points: []v3.Point{
|
||||||
|
{Timestamp: 1609466400000, Value: 2.0},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedStart: 1609466400000,
|
||||||
|
expectedEnd: 1609470000000,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
filteredSeries, startTime, endTime := FilterSeriesPoints(tc.seriesList, tc.missStart, tc.missEnd, tc.stepInterval)
|
||||||
|
|
||||||
|
if len(tc.expectedPoints) != len(filteredSeries) {
|
||||||
|
t.Errorf("Expected %d series, got %d", len(tc.expectedPoints), len(filteredSeries))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range tc.expectedPoints {
|
||||||
|
if len(tc.expectedPoints[i].Points) != len(filteredSeries[i].Points) {
|
||||||
|
t.Errorf("Series %d: Expected %d points, got %d\nExpected points: %+v\nGot points: %+v",
|
||||||
|
i,
|
||||||
|
len(tc.expectedPoints[i].Points),
|
||||||
|
len(filteredSeries[i].Points),
|
||||||
|
tc.expectedPoints[i].Points,
|
||||||
|
filteredSeries[i].Points)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for j := range tc.expectedPoints[i].Points {
|
||||||
|
if tc.expectedPoints[i].Points[j].Timestamp != filteredSeries[i].Points[j].Timestamp {
|
||||||
|
t.Errorf("Series %d Point %d: Expected timestamp %d, got %d", i, j, tc.expectedPoints[i].Points[j].Timestamp, filteredSeries[i].Points[j].Timestamp)
|
||||||
|
}
|
||||||
|
if tc.expectedPoints[i].Points[j].Value != filteredSeries[i].Points[j].Value {
|
||||||
|
t.Errorf("Series %d Point %d: Expected value %f, got %f", i, j, tc.expectedPoints[i].Points[j].Value, filteredSeries[i].Points[j].Value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if tc.expectedStart != startTime {
|
||||||
|
t.Errorf("Expected start time %d, got %d", tc.expectedStart, startTime)
|
||||||
|
}
|
||||||
|
if tc.expectedEnd != endTime {
|
||||||
|
t.Errorf("Expected end time %d, got %d", tc.expectedEnd, endTime)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetSeriesFromCachedData(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
data []querycache.CachedSeriesData
|
||||||
|
start int64
|
||||||
|
end int64
|
||||||
|
expectedCount int
|
||||||
|
expectedPoints int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Single point outside range",
|
||||||
|
data: []querycache.CachedSeriesData{
|
||||||
|
{
|
||||||
|
Data: []*v3.Series{
|
||||||
|
{
|
||||||
|
Labels: map[string]string{"label1": "value1"},
|
||||||
|
Points: []v3.Point{
|
||||||
|
{Timestamp: 1609473600000, Value: 1.0},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
start: 1609475400000, // 01 Jan 2021 04:30:00 UTC
|
||||||
|
end: 1609477200000, // 01 Jan 2021 05:00:00 UTC
|
||||||
|
expectedCount: 1,
|
||||||
|
expectedPoints: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Single point inside range",
|
||||||
|
data: []querycache.CachedSeriesData{
|
||||||
|
{
|
||||||
|
Data: []*v3.Series{
|
||||||
|
{
|
||||||
|
Labels: map[string]string{"label1": "value1"},
|
||||||
|
Points: []v3.Point{
|
||||||
|
{Timestamp: 1609476000000, Value: 1.0},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
start: 1609475400000, // 01 Jan 2021 04:30:00 UTC
|
||||||
|
end: 1609477200000, // 01 Jan 2021 05:00:00 UTC
|
||||||
|
expectedCount: 1,
|
||||||
|
expectedPoints: 1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
series := GetSeriesFromCachedData(tc.data, tc.start, tc.end)
|
||||||
|
|
||||||
|
if len(series) != tc.expectedCount {
|
||||||
|
t.Errorf("Expected %d series, got %d", tc.expectedCount, len(series))
|
||||||
|
}
|
||||||
|
if len(series[0].Points) != tc.expectedPoints {
|
||||||
|
t.Errorf("Expected %d points, got %d", tc.expectedPoints, len(series[0].Points))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetSeriesFromCachedDataV2(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
data []querycache.CachedSeriesData
|
||||||
|
start int64
|
||||||
|
end int64
|
||||||
|
step int64
|
||||||
|
expectedCount int
|
||||||
|
expectedPoints int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Single point outside range",
|
||||||
|
data: []querycache.CachedSeriesData{
|
||||||
|
{
|
||||||
|
Data: []*v3.Series{
|
||||||
|
{
|
||||||
|
Labels: map[string]string{"label1": "value1"},
|
||||||
|
Points: []v3.Point{
|
||||||
|
{Timestamp: 1609473600000, Value: 1.0},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
start: 1609475400000,
|
||||||
|
end: 1609477200000,
|
||||||
|
step: 1000,
|
||||||
|
expectedCount: 1,
|
||||||
|
expectedPoints: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Single point inside range",
|
||||||
|
data: []querycache.CachedSeriesData{
|
||||||
|
{
|
||||||
|
Data: []*v3.Series{
|
||||||
|
{
|
||||||
|
Labels: map[string]string{"label1": "value1"},
|
||||||
|
Points: []v3.Point{
|
||||||
|
{Timestamp: 1609476000000, Value: 1.0},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
start: 1609475400000,
|
||||||
|
end: 1609477200000,
|
||||||
|
step: 1000,
|
||||||
|
expectedCount: 1,
|
||||||
|
expectedPoints: 1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
series := GetSeriesFromCachedDataV2(tc.data, tc.start, tc.end, tc.step)
|
||||||
|
|
||||||
|
if len(series) != tc.expectedCount {
|
||||||
|
t.Errorf("Expected %d series, got %d", tc.expectedCount, len(series))
|
||||||
|
}
|
||||||
|
if len(series[0].Points) != tc.expectedPoints {
|
||||||
|
t.Errorf("Expected %d points, got %d", tc.expectedPoints, len(series[0].Points))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
@ -149,4 +149,6 @@ type QueryCache interface {
|
|||||||
FindMissingTimeRanges(start, end int64, step int64, cacheKey string) []querycache.MissInterval
|
FindMissingTimeRanges(start, end int64, step int64, cacheKey string) []querycache.MissInterval
|
||||||
FindMissingTimeRangesV2(start, end int64, step int64, cacheKey string) []querycache.MissInterval
|
FindMissingTimeRangesV2(start, end int64, step int64, cacheKey string) []querycache.MissInterval
|
||||||
MergeWithCachedSeriesData(cacheKey string, newData []querycache.CachedSeriesData) []querycache.CachedSeriesData
|
MergeWithCachedSeriesData(cacheKey string, newData []querycache.CachedSeriesData) []querycache.CachedSeriesData
|
||||||
|
StoreSeriesInCache(cacheKey string, series []querycache.CachedSeriesData)
|
||||||
|
MergeWithCachedSeriesDataV2(cacheKey string, series []querycache.CachedSeriesData) []querycache.CachedSeriesData
|
||||||
}
|
}
|
||||||
|
@ -264,6 +264,9 @@ func (q *queryCache) mergeSeries(cachedSeries, missedSeries []*v3.Series) []*v3.
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (q *queryCache) storeMergedData(cacheKey string, mergedData []CachedSeriesData) {
|
func (q *queryCache) storeMergedData(cacheKey string, mergedData []CachedSeriesData) {
|
||||||
|
if q.cache == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
mergedDataJSON, err := json.Marshal(mergedData)
|
mergedDataJSON, err := json.Marshal(mergedData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
zap.L().Error("error marshalling merged data", zap.Error(err))
|
zap.L().Error("error marshalling merged data", zap.Error(err))
|
||||||
@ -275,8 +278,7 @@ func (q *queryCache) storeMergedData(cacheKey string, mergedData []CachedSeriesD
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *queryCache) MergeWithCachedSeriesData(cacheKey string, newData []CachedSeriesData) []CachedSeriesData {
|
func (q *queryCache) MergeWithCachedSeriesDataV2(cacheKey string, newData []CachedSeriesData) []CachedSeriesData {
|
||||||
|
|
||||||
if q.cache == nil {
|
if q.cache == nil {
|
||||||
return newData
|
return newData
|
||||||
}
|
}
|
||||||
@ -284,8 +286,7 @@ func (q *queryCache) MergeWithCachedSeriesData(cacheKey string, newData []Cached
|
|||||||
cachedData, _, _ := q.cache.Retrieve(cacheKey, true)
|
cachedData, _, _ := q.cache.Retrieve(cacheKey, true)
|
||||||
var existingData []CachedSeriesData
|
var existingData []CachedSeriesData
|
||||||
if err := json.Unmarshal(cachedData, &existingData); err != nil {
|
if err := json.Unmarshal(cachedData, &existingData); err != nil {
|
||||||
// In case of error, we return the entire range as a miss
|
zap.L().Error("error unmarshalling existing data", zap.Error(err))
|
||||||
q.storeMergedData(cacheKey, newData)
|
|
||||||
return newData
|
return newData
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -330,7 +331,16 @@ func (q *queryCache) MergeWithCachedSeriesData(cacheKey string, newData []Cached
|
|||||||
mergedData = append(mergedData, *current)
|
mergedData = append(mergedData, *current)
|
||||||
}
|
}
|
||||||
|
|
||||||
q.storeMergedData(cacheKey, mergedData)
|
|
||||||
|
|
||||||
return mergedData
|
return mergedData
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (q *queryCache) MergeWithCachedSeriesData(cacheKey string, newData []CachedSeriesData) []CachedSeriesData {
|
||||||
|
|
||||||
|
mergedData := q.MergeWithCachedSeriesDataV2(cacheKey, newData)
|
||||||
|
q.storeMergedData(cacheKey, mergedData)
|
||||||
|
return mergedData
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *queryCache) StoreSeriesInCache(cacheKey string, series []CachedSeriesData) {
|
||||||
|
q.storeMergedData(cacheKey, series)
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user