chore: add support for caching multiple time ranges for cache key (#6008)

This commit is contained in:
Srikanth Chekuri 2024-09-20 19:23:01 +05:30 committed by GitHub
parent 4aabfe7cf5
commit 033b64a62a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 964 additions and 744 deletions

View File

@ -2,20 +2,19 @@ package querier
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"strings" "strings"
"sync" "sync"
"time"
logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3"
logsV4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4" logsV4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4"
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
"go.signoz.io/signoz/pkg/query-service/cache/status" "go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/postprocess" "go.signoz.io/signoz/pkg/query-service/postprocess"
"go.signoz.io/signoz/pkg/query-service/querycache"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -107,7 +106,8 @@ func (q *querier) runBuilderQuery(
if builderQuery.DataSource == v3.DataSourceLogs { if builderQuery.DataSource == v3.DataSourceLogs {
var query string var query string
var err error var err error
if _, ok := cacheKeys[queryName]; !ok { if _, ok := cacheKeys[queryName]; !ok || params.NoCache {
zap.L().Info("skipping cache for logs query", zap.String("queryName", queryName), zap.Int64("start", start), zap.Int64("end", end), zap.Int64("step", builderQuery.StepInterval), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName]))
query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, start, end, builderQuery, params, preferRPM) query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, start, end, builderQuery, params, preferRPM)
if err != nil { if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
@ -118,21 +118,11 @@ func (q *querier) runBuilderQuery(
return return
} }
cacheKey := cacheKeys[queryName] misses := q.queryCache.FindMissingTimeRanges(start, end, builderQuery.StepInterval, cacheKeys[queryName])
var cachedData []byte zap.L().Info("cache misses for logs query", zap.Any("misses", misses))
if !params.NoCache && q.cache != nil { missedSeries := make([]querycache.CachedSeriesData, 0)
var retrieveStatus status.RetrieveStatus
data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true)
zap.L().Info("cache retrieve status", zap.String("status", retrieveStatus.String()))
if err == nil {
cachedData = data
}
}
misses, replaceCachedData := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses { for _, miss := range misses {
query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, miss.start, miss.end, builderQuery, params, preferRPM) query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, miss.Start, miss.End, builderQuery, params, preferRPM)
if err != nil { if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return return
@ -147,48 +137,23 @@ func (q *querier) runBuilderQuery(
} }
return return
} }
missedSeries = append(missedSeries, series...) missedSeries = append(missedSeries, querycache.CachedSeriesData{
} Start: miss.Start,
if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { End: miss.End,
zap.L().Error("error unmarshalling cached data", zap.Error(err)) Data: series,
} })
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
if replaceCachedData {
mergedSeries = missedSeries
} }
mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKeys[queryName], missedSeries)
var mergedSeriesData []byte resultSeries := common.GetSeriesFromCachedData(mergedSeries, start, end)
var marshallingErr error
missedSeriesLen := len(missedSeries)
if missedSeriesLen > 0 && !params.NoCache && q.cache != nil {
// caching the data
mergedSeriesData, marshallingErr = json.Marshal(mergedSeries)
if marshallingErr != nil {
zap.L().Error("error marshalling merged series", zap.Error(marshallingErr))
}
}
// response doesn't need everything
filterCachedPoints(mergedSeries, start, end)
ch <- channelResult{ ch <- channelResult{
Err: nil, Err: nil,
Name: queryName, Name: queryName,
Series: mergedSeries, Series: resultSeries,
}
// Cache the seriesList for future queries
if missedSeriesLen > 0 && !params.NoCache && q.cache != nil && marshallingErr == nil {
// caching the data
err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
if err != nil {
zap.L().Error("error storing merged series", zap.Error(err))
return
}
} }
return return
} }
if builderQuery.DataSource == v3.DataSourceTraces { if builderQuery.DataSource == v3.DataSourceTraces {
@ -242,7 +207,8 @@ func (q *querier) runBuilderQuery(
// What is happening here? // What is happening here?
// We are only caching the graph panel queries. A non-existant cache key means that the query is not cached. // We are only caching the graph panel queries. A non-existant cache key means that the query is not cached.
// If the query is not cached, we execute the query and return the result without caching it. // If the query is not cached, we execute the query and return the result without caching it.
if _, ok := cacheKeys[queryName]; !ok { if _, ok := cacheKeys[queryName]; !ok || params.NoCache {
zap.L().Info("skipping cache for metrics query", zap.String("queryName", queryName), zap.Int64("start", start), zap.Int64("end", end), zap.Int64("step", builderQuery.StepInterval), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName]))
query, err := metricsV3.PrepareMetricQuery(start, end, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, metricsV3.Options{PreferRPM: preferRPM}) query, err := metricsV3.PrepareMetricQuery(start, end, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, metricsV3.Options{PreferRPM: preferRPM})
if err != nil { if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
@ -254,22 +220,13 @@ func (q *querier) runBuilderQuery(
} }
cacheKey := cacheKeys[queryName] cacheKey := cacheKeys[queryName]
var cachedData []byte misses := q.queryCache.FindMissingTimeRanges(start, end, builderQuery.StepInterval, cacheKey)
if !params.NoCache && q.cache != nil { zap.L().Info("cache misses for metrics query", zap.Any("misses", misses))
var retrieveStatus status.RetrieveStatus missedSeries := make([]querycache.CachedSeriesData, 0)
data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true)
zap.L().Info("cache retrieve status", zap.String("status", retrieveStatus.String()))
if err == nil {
cachedData = data
}
}
misses, replaceCachedData := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses { for _, miss := range misses {
query, err := metricsV3.PrepareMetricQuery( query, err := metricsV3.PrepareMetricQuery(
miss.start, miss.Start,
miss.end, miss.End,
params.CompositeQuery.QueryType, params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType, params.CompositeQuery.PanelType,
builderQuery, builderQuery,
@ -294,41 +251,20 @@ func (q *querier) runBuilderQuery(
} }
return return
} }
missedSeries = append(missedSeries, series...) missedSeries = append(missedSeries, querycache.CachedSeriesData{
} Start: miss.Start,
if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { End: miss.End,
zap.L().Error("error unmarshalling cached data", zap.Error(err)) Data: series,
} })
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
if replaceCachedData {
mergedSeries = missedSeries
}
var mergedSeriesData []byte
var marshallingErr error
missedSeriesLen := len(missedSeries)
if missedSeriesLen > 0 && !params.NoCache && q.cache != nil {
// caching the data
mergedSeriesData, marshallingErr = json.Marshal(mergedSeries)
if marshallingErr != nil {
zap.L().Error("error marshalling merged series", zap.Error(marshallingErr))
}
} }
mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKey, missedSeries)
resultSeries := common.GetSeriesFromCachedData(mergedSeries, start, end)
// response doesn't need everything
filterCachedPoints(mergedSeries, start, end)
ch <- channelResult{ ch <- channelResult{
Err: nil, Err: nil,
Name: queryName, Name: queryName,
Series: mergedSeries, Series: resultSeries,
}
// Cache the seriesList for future queries
if missedSeriesLen > 0 && !params.NoCache && q.cache != nil && marshallingErr == nil {
err := q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
if err != nil {
zap.L().Error("error storing merged series", zap.Error(err))
return
}
} }
} }
@ -350,7 +286,8 @@ func (q *querier) runBuilderExpression(
return return
} }
if _, ok := cacheKeys[queryName]; !ok { if _, ok := cacheKeys[queryName]; !ok || params.NoCache {
zap.L().Info("skipping cache for expression query", zap.String("queryName", queryName), zap.Int64("start", params.Start), zap.Int64("end", params.End), zap.Int64("step", params.Step), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName]))
query := queries[queryName] query := queries[queryName]
series, err := q.execClickHouseQuery(ctx, query) series, err := q.execClickHouseQuery(ctx, query)
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series} ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series}
@ -358,23 +295,14 @@ func (q *querier) runBuilderExpression(
} }
cacheKey := cacheKeys[queryName] cacheKey := cacheKeys[queryName]
var cachedData []byte
if !params.NoCache && q.cache != nil {
var retrieveStatus status.RetrieveStatus
data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true)
zap.L().Info("cache retrieve status", zap.String("status", retrieveStatus.String()))
if err == nil {
cachedData = data
}
}
step := postprocess.StepIntervalForFunction(params, queryName) step := postprocess.StepIntervalForFunction(params, queryName)
misses, replaceCachedData := q.findMissingTimeRanges(params.Start, params.End, step, cachedData) misses := q.queryCache.FindMissingTimeRanges(params.Start, params.End, step, cacheKey)
missedSeries := make([]*v3.Series, 0) zap.L().Info("cache misses for expression query", zap.Any("misses", misses))
cachedSeries := make([]*v3.Series, 0) missedSeries := make([]querycache.CachedSeriesData, 0)
for _, miss := range misses { for _, miss := range misses {
missQueries, _ := q.builder.PrepareQueries(&v3.QueryRangeParamsV3{ missQueries, _ := q.builder.PrepareQueries(&v3.QueryRangeParamsV3{
Start: miss.start, Start: miss.Start,
End: miss.end, End: miss.End,
Step: params.Step, Step: params.Step,
NoCache: params.NoCache, NoCache: params.NoCache,
CompositeQuery: params.CompositeQuery, CompositeQuery: params.CompositeQuery,
@ -386,41 +314,19 @@ func (q *querier) runBuilderExpression(
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return return
} }
missedSeries = append(missedSeries, series...) missedSeries = append(missedSeries, querycache.CachedSeriesData{
} Start: miss.Start,
if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { End: miss.End,
zap.L().Error("error unmarshalling cached data", zap.Error(err)) Data: series,
} })
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
if replaceCachedData {
mergedSeries = missedSeries
} }
mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKey, missedSeries)
var mergedSeriesData []byte resultSeries := common.GetSeriesFromCachedData(mergedSeries, params.Start, params.End)
missedSeriesLen := len(missedSeries)
var marshallingErr error
if missedSeriesLen > 0 && !params.NoCache && q.cache != nil {
// caching the data
mergedSeriesData, marshallingErr = json.Marshal(mergedSeries)
if marshallingErr != nil {
zap.L().Error("error marshalling merged series", zap.Error(marshallingErr))
}
}
// response doesn't need everything
filterCachedPoints(mergedSeries, params.Start, params.End)
ch <- channelResult{ ch <- channelResult{
Err: nil, Err: nil,
Name: queryName, Name: queryName,
Series: mergedSeries, Series: resultSeries,
}
// Cache the seriesList for future queries
if len(missedSeries) > 0 && !params.NoCache && q.cache != nil && marshallingErr == nil {
err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
if err != nil {
zap.L().Error("error storing merged series", zap.Error(err))
return
}
} }
} }

View File

@ -2,11 +2,7 @@ package querier
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"math"
"sort"
"strings"
"sync" "sync"
"time" "time"
@ -15,7 +11,9 @@ import (
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
"go.signoz.io/signoz/pkg/query-service/common"
chErrors "go.signoz.io/signoz/pkg/query-service/errors" chErrors "go.signoz.io/signoz/pkg/query-service/errors"
"go.signoz.io/signoz/pkg/query-service/querycache"
"go.signoz.io/signoz/pkg/query-service/utils" "go.signoz.io/signoz/pkg/query-service/utils"
"go.signoz.io/signoz/pkg/query-service/cache" "go.signoz.io/signoz/pkg/query-service/cache"
@ -34,14 +32,11 @@ type channelResult struct {
Query string Query string
} }
type missInterval struct {
start, end int64 // in milliseconds
}
type querier struct { type querier struct {
cache cache.Cache cache cache.Cache
reader interfaces.Reader reader interfaces.Reader
keyGenerator cache.KeyGenerator keyGenerator cache.KeyGenerator
queryCache interfaces.QueryCache
fluxInterval time.Duration fluxInterval time.Duration
@ -80,8 +75,11 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
logsQueryBuilder = logsV4.PrepareLogsQuery logsQueryBuilder = logsV4.PrepareLogsQuery
} }
qc := querycache.NewQueryCache(querycache.WithCache(opts.Cache), querycache.WithFluxInterval(opts.FluxInterval))
return &querier{ return &querier{
cache: opts.Cache, cache: opts.Cache,
queryCache: qc,
reader: opts.Reader, reader: opts.Reader,
keyGenerator: opts.KeyGenerator, keyGenerator: opts.KeyGenerator,
fluxInterval: opts.FluxInterval, fluxInterval: opts.FluxInterval,
@ -154,156 +152,6 @@ func (q *querier) execPromQuery(ctx context.Context, params *model.QueryRangePar
return seriesList, nil return seriesList, nil
} }
// findMissingTimeRanges finds the missing time ranges in the seriesList
// and returns a list of miss structs, It takes the fluxInterval into
// account to find the missing time ranges.
//
// The [End - fluxInterval, End] is always added to the list of misses, because
// the data might still be in flux and not yet available in the database.
//
// replaceCacheData is used to indicate if the cache data should be replaced instead of merging
// with the new data
// TODO: Remove replaceCacheData with a better logic
func findMissingTimeRanges(start, end, step int64, seriesList []*v3.Series, fluxInterval time.Duration) (misses []missInterval, replaceCacheData bool) {
replaceCacheData = false
var cachedStart, cachedEnd int64
for idx := range seriesList {
series := seriesList[idx]
for pointIdx := range series.Points {
point := series.Points[pointIdx]
if cachedStart == 0 || point.Timestamp < cachedStart {
cachedStart = point.Timestamp
}
if cachedEnd == 0 || point.Timestamp > cachedEnd {
cachedEnd = point.Timestamp
}
}
}
// time.Now is used because here we are considering the case where data might not
// be fully ingested for last (fluxInterval) minutes
endMillis := time.Now().UnixMilli()
adjustStep := int64(math.Min(float64(step), 60))
roundedMillis := endMillis - (endMillis % (adjustStep * 1000))
// Exclude the flux interval from the cached end time
cachedEnd = int64(
math.Min(
float64(cachedEnd),
float64(roundedMillis-fluxInterval.Milliseconds()),
),
)
// There are five cases to consider
// 1. Cached time range is a subset of the requested time range
// 2. Cached time range is a superset of the requested time range
// 3. Cached time range is a left overlap of the requested time range
// 4. Cached time range is a right overlap of the requested time range
// 5. Cached time range is a disjoint of the requested time range
if cachedStart >= start && cachedEnd <= end {
// Case 1: Cached time range is a subset of the requested time range
// Add misses for the left and right sides of the cached time range
misses = append(misses, missInterval{start: start, end: cachedStart - 1})
misses = append(misses, missInterval{start: cachedEnd + 1, end: end})
} else if cachedStart <= start && cachedEnd >= end {
// Case 2: Cached time range is a superset of the requested time range
// No misses
} else if cachedStart <= start && cachedEnd >= start {
// Case 3: Cached time range is a left overlap of the requested time range
// Add a miss for the left side of the cached time range
misses = append(misses, missInterval{start: cachedEnd + 1, end: end})
} else if cachedStart <= end && cachedEnd >= end {
// Case 4: Cached time range is a right overlap of the requested time range
// Add a miss for the right side of the cached time range
misses = append(misses, missInterval{start: start, end: cachedStart - 1})
} else {
// Case 5: Cached time range is a disjoint of the requested time range
// Add a miss for the entire requested time range
misses = append(misses, missInterval{start: start, end: end})
replaceCacheData = true
}
// remove the struts with start > end
var validMisses []missInterval
for idx := range misses {
miss := misses[idx]
if miss.start < miss.end {
validMisses = append(validMisses, miss)
}
}
return validMisses, replaceCacheData
}
// findMissingTimeRanges finds the missing time ranges in the cached data
// and returns them as a list of misses
func (q *querier) findMissingTimeRanges(start, end, step int64, cachedData []byte) (misses []missInterval, replaceCachedData bool) {
var cachedSeriesList []*v3.Series
if err := json.Unmarshal(cachedData, &cachedSeriesList); err != nil {
// In case of error, we return the entire range as a miss
return []missInterval{{start: start, end: end}}, true
}
return findMissingTimeRanges(start, end, step, cachedSeriesList, q.fluxInterval)
}
func labelsToString(labels map[string]string) string {
type label struct {
Key string
Value string
}
var labelsList []label
for k, v := range labels {
labelsList = append(labelsList, label{Key: k, Value: v})
}
sort.Slice(labelsList, func(i, j int) bool {
return labelsList[i].Key < labelsList[j].Key
})
labelKVs := make([]string, len(labelsList))
for idx := range labelsList {
labelKVs[idx] = labelsList[idx].Key + "=" + labelsList[idx].Value
}
return fmt.Sprintf("{%s}", strings.Join(labelKVs, ","))
}
func filterCachedPoints(cachedSeries []*v3.Series, start, end int64) {
for _, c := range cachedSeries {
points := []v3.Point{}
for _, p := range c.Points {
if (p.Timestamp < start || p.Timestamp > end) && p.Timestamp != 0 {
continue
}
points = append(points, p)
}
c.Points = points
}
}
func mergeSerieses(cachedSeries, missedSeries []*v3.Series) []*v3.Series {
// Merge the missed series with the cached series by timestamp
mergedSeries := make([]*v3.Series, 0)
seriesesByLabels := make(map[string]*v3.Series)
for idx := range cachedSeries {
series := cachedSeries[idx]
seriesesByLabels[labelsToString(series.Labels)] = series
}
for idx := range missedSeries {
series := missedSeries[idx]
if _, ok := seriesesByLabels[labelsToString(series.Labels)]; !ok {
seriesesByLabels[labelsToString(series.Labels)] = series
continue
}
seriesesByLabels[labelsToString(series.Labels)].Points = append(seriesesByLabels[labelsToString(series.Labels)].Points, series.Points...)
}
// Sort the points in each series by timestamp
for idx := range seriesesByLabels {
series := seriesesByLabels[idx]
series.SortPoints()
series.RemoveDuplicatePoints()
mergedSeries = append(mergedSeries, series)
}
return mergedSeries
}
func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) { func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) {
cacheKeys := q.keyGenerator.GenerateKeys(params) cacheKeys := q.keyGenerator.GenerateKeys(params)
@ -363,51 +211,34 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam
go func(queryName string, promQuery *v3.PromQuery) { go func(queryName string, promQuery *v3.PromQuery) {
defer wg.Done() defer wg.Done()
cacheKey, ok := cacheKeys[queryName] cacheKey, ok := cacheKeys[queryName]
var cachedData []byte
// Ensure NoCache is not set and cache is not nil if !ok || params.NoCache {
if !params.NoCache && q.cache != nil && ok { zap.L().Info("skipping cache for metrics prom query", zap.String("queryName", queryName), zap.Int64("start", params.Start), zap.Int64("end", params.End), zap.Int64("step", params.Step), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName]))
data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true) query := metricsV3.BuildPromQuery(promQuery, params.Step, params.Start, params.End)
zap.L().Info("cache retrieve status", zap.String("status", retrieveStatus.String())) series, err := q.execPromQuery(ctx, query)
if err == nil { channelResults <- channelResult{Err: err, Name: queryName, Query: query.Query, Series: series}
cachedData = data return
}
} }
misses, replaceCachedData := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData) misses := q.queryCache.FindMissingTimeRanges(params.Start, params.End, params.Step, cacheKey)
missedSeries := make([]*v3.Series, 0) zap.L().Info("cache misses for metrics prom query", zap.Any("misses", misses))
cachedSeries := make([]*v3.Series, 0) missedSeries := make([]querycache.CachedSeriesData, 0)
for _, miss := range misses { for _, miss := range misses {
query := metricsV3.BuildPromQuery(promQuery, params.Step, miss.start, miss.end) query := metricsV3.BuildPromQuery(promQuery, params.Step, miss.Start, miss.End)
series, err := q.execPromQuery(ctx, query) series, err := q.execPromQuery(ctx, query)
if err != nil { if err != nil {
channelResults <- channelResult{Err: err, Name: queryName, Query: query.Query, Series: nil} channelResults <- channelResult{Err: err, Name: queryName, Query: query.Query, Series: nil}
return return
} }
missedSeries = append(missedSeries, series...) missedSeries = append(missedSeries, querycache.CachedSeriesData{
} Data: series,
if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { Start: miss.Start,
// ideally we should not be getting an error here End: miss.End,
zap.L().Error("error unmarshalling cached data", zap.Error(err)) })
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
if replaceCachedData {
mergedSeries = missedSeries
} }
mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKey, missedSeries)
resultSeries := common.GetSeriesFromCachedData(mergedSeries, params.Start, params.End)
channelResults <- channelResult{Err: nil, Name: queryName, Query: promQuery.Query, Series: resultSeries}
channelResults <- channelResult{Err: nil, Name: queryName, Query: promQuery.Query, Series: mergedSeries}
// Cache the seriesList for future queries
if len(missedSeries) > 0 && !params.NoCache && q.cache != nil && ok {
mergedSeriesData, err := json.Marshal(mergedSeries)
if err != nil {
zap.L().Error("error marshalling merged series", zap.Error(err))
return
}
err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
if err != nil {
zap.L().Error("error storing merged series", zap.Error(err))
return
}
}
}(queryName, promQuery) }(queryName, promQuery)
} }
wg.Wait() wg.Wait()

View File

@ -2,7 +2,9 @@ package querier
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"math"
"strings" "strings"
"testing" "testing"
"time" "time"
@ -11,8 +13,33 @@ import (
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
"go.signoz.io/signoz/pkg/query-service/cache/inmemory" "go.signoz.io/signoz/pkg/query-service/cache/inmemory"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/querycache"
) )
func minTimestamp(series []*v3.Series) int64 {
min := int64(math.MaxInt64)
for _, series := range series {
for _, point := range series.Points {
if point.Timestamp < min {
min = point.Timestamp
}
}
}
return min
}
func maxTimestamp(series []*v3.Series) int64 {
max := int64(math.MinInt64)
for _, series := range series {
for _, point := range series.Points {
if point.Timestamp > max {
max = point.Timestamp
}
}
}
return max
}
func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) {
// There are five scenarios: // There are five scenarios:
// 1. Cached time range is a subset of the requested time range // 1. Cached time range is a subset of the requested time range
@ -26,7 +53,7 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) {
requestedEnd int64 // in milliseconds requestedEnd int64 // in milliseconds
requestedStep int64 // in seconds requestedStep int64 // in seconds
cachedSeries []*v3.Series cachedSeries []*v3.Series
expectedMiss []missInterval expectedMiss []querycache.MissInterval
replaceCachedData bool replaceCachedData bool
}{ }{
{ {
@ -51,14 +78,14 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) {
}, },
}, },
}, },
expectedMiss: []missInterval{ expectedMiss: []querycache.MissInterval{
{ {
start: 1675115596722, Start: 1675115596722,
end: 1675115596722 + 60*60*1000 - 1, End: 1675115596722 + 60*60*1000,
}, },
{ {
start: 1675115596722 + 120*60*1000 + 1, Start: 1675115596722 + 120*60*1000,
end: 1675115596722 + 180*60*1000, End: 1675115596722 + 180*60*1000,
}, },
}, },
}, },
@ -92,7 +119,7 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) {
}, },
}, },
}, },
expectedMiss: []missInterval{}, expectedMiss: []querycache.MissInterval{},
}, },
{ {
name: "cached time range is a left overlap of the requested time range", name: "cached time range is a left overlap of the requested time range",
@ -120,10 +147,10 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) {
}, },
}, },
}, },
expectedMiss: []missInterval{ expectedMiss: []querycache.MissInterval{
{ {
start: 1675115596722 + 120*60*1000 + 1, Start: 1675115596722 + 120*60*1000,
end: 1675115596722 + 180*60*1000, End: 1675115596722 + 180*60*1000,
}, },
}, },
}, },
@ -153,10 +180,10 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) {
}, },
}, },
}, },
expectedMiss: []missInterval{ expectedMiss: []querycache.MissInterval{
{ {
start: 1675115596722, Start: 1675115596722,
end: 1675115596722 + 60*60*1000 - 1, End: 1675115596722 + 60*60*1000,
}, },
}, },
}, },
@ -186,31 +213,48 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) {
}, },
}, },
}, },
expectedMiss: []missInterval{ expectedMiss: []querycache.MissInterval{
{ {
start: 1675115596722, Start: 1675115596722,
end: 1675115596722 + 180*60*1000, End: 1675115596722 + 180*60*1000,
}, },
}, },
replaceCachedData: true, replaceCachedData: true,
}, },
} }
for _, tc := range testCases { c := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute})
qc := querycache.NewQueryCache(querycache.WithCache(c))
for idx, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
misses, replaceCachedData := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, 0*time.Minute) cacheKey := fmt.Sprintf("test-cache-key-%d", idx)
cachedData := &querycache.CachedSeriesData{
Start: minTimestamp(tc.cachedSeries),
End: maxTimestamp(tc.cachedSeries),
Data: tc.cachedSeries,
}
jsonData, err := json.Marshal([]*querycache.CachedSeriesData{cachedData})
if err != nil {
t.Errorf("error marshalling cached data: %v", err)
}
err = c.Store(cacheKey, jsonData, 5*time.Minute)
if err != nil {
t.Errorf("error storing cached data: %v", err)
}
misses := qc.FindMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, cacheKey)
if len(misses) != len(tc.expectedMiss) { if len(misses) != len(tc.expectedMiss) {
t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses)) t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses))
} }
if replaceCachedData != tc.replaceCachedData {
t.Errorf("expected replaceCachedData %t, got %t", tc.replaceCachedData, replaceCachedData)
}
for i, miss := range misses { for i, miss := range misses {
if miss.start != tc.expectedMiss[i].start { if miss.Start != tc.expectedMiss[i].Start {
t.Errorf("expected start %d, got %d", tc.expectedMiss[i].start, miss.start) t.Errorf("expected start %d, got %d", tc.expectedMiss[i].Start, miss.Start)
} }
if miss.end != tc.expectedMiss[i].end { if miss.End != tc.expectedMiss[i].End {
t.Errorf("expected end %d, got %d", tc.expectedMiss[i].end, miss.end) t.Errorf("expected end %d, got %d", tc.expectedMiss[i].End, miss.End)
} }
} }
}) })
@ -226,7 +270,7 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) {
requestedStep int64 requestedStep int64
cachedSeries []*v3.Series cachedSeries []*v3.Series
fluxInterval time.Duration fluxInterval time.Duration
expectedMiss []missInterval expectedMiss []querycache.MissInterval
}{ }{
{ {
name: "cached time range is a subset of the requested time range", name: "cached time range is a subset of the requested time range",
@ -251,14 +295,14 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) {
}, },
}, },
fluxInterval: 5 * time.Minute, fluxInterval: 5 * time.Minute,
expectedMiss: []missInterval{ expectedMiss: []querycache.MissInterval{
{ {
start: 1675115596722, Start: 1675115596722,
end: 1675115596722 + 60*60*1000 - 1, End: 1675115596722 + 60*60*1000,
}, },
{ {
start: 1675115596722 + 120*60*1000 + 1, Start: 1675115596722 + 120*60*1000,
end: 1675115596722 + 180*60*1000, End: 1675115596722 + 180*60*1000,
}, },
}, },
}, },
@ -293,7 +337,7 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) {
}, },
}, },
fluxInterval: 5 * time.Minute, fluxInterval: 5 * time.Minute,
expectedMiss: []missInterval{}, expectedMiss: []querycache.MissInterval{},
}, },
{ {
name: "cache time range is a left overlap of the requested time range", name: "cache time range is a left overlap of the requested time range",
@ -322,10 +366,10 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) {
}, },
}, },
fluxInterval: 5 * time.Minute, fluxInterval: 5 * time.Minute,
expectedMiss: []missInterval{ expectedMiss: []querycache.MissInterval{
{ {
start: 1675115596722 + 120*60*1000 + 1, Start: 1675115596722 + 120*60*1000,
end: 1675115596722 + 180*60*1000, End: 1675115596722 + 180*60*1000,
}, },
}, },
}, },
@ -356,10 +400,10 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) {
}, },
}, },
fluxInterval: 5 * time.Minute, fluxInterval: 5 * time.Minute,
expectedMiss: []missInterval{ expectedMiss: []querycache.MissInterval{
{ {
start: 1675115596722, Start: 1675115596722,
end: 1675115596722 + 60*60*1000 - 1, End: 1675115596722 + 60*60*1000,
}, },
}, },
}, },
@ -390,27 +434,45 @@ func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) {
}, },
}, },
fluxInterval: 5 * time.Minute, fluxInterval: 5 * time.Minute,
expectedMiss: []missInterval{ expectedMiss: []querycache.MissInterval{
{ {
start: 1675115596722, Start: 1675115596722,
end: 1675115596722 + 180*60*1000, End: 1675115596722 + 180*60*1000,
}, },
}, },
}, },
} }
for _, tc := range testCases { c := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute})
qc := querycache.NewQueryCache(querycache.WithCache(c))
for idx, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
misses, _ := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, tc.fluxInterval) cacheKey := fmt.Sprintf("test-cache-key-%d", idx)
cachedData := &querycache.CachedSeriesData{
Start: minTimestamp(tc.cachedSeries),
End: maxTimestamp(tc.cachedSeries),
Data: tc.cachedSeries,
}
jsonData, err := json.Marshal([]*querycache.CachedSeriesData{cachedData})
if err != nil {
t.Errorf("error marshalling cached data: %v", err)
}
err = c.Store(cacheKey, jsonData, 5*time.Minute)
if err != nil {
t.Errorf("error storing cached data: %v", err)
}
misses := qc.FindMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, cacheKey)
if len(misses) != len(tc.expectedMiss) { if len(misses) != len(tc.expectedMiss) {
t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses)) t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses))
} }
for i, miss := range misses { for i, miss := range misses {
if miss.start != tc.expectedMiss[i].start { if miss.Start != tc.expectedMiss[i].Start {
t.Errorf("expected start %d, got %d", tc.expectedMiss[i].start, miss.start) t.Errorf("expected start %d, got %d", tc.expectedMiss[i].Start, miss.Start)
} }
if miss.end != tc.expectedMiss[i].end { if miss.End != tc.expectedMiss[i].End {
t.Errorf("expected end %d, got %d", tc.expectedMiss[i].end, miss.end) t.Errorf("expected end %d, got %d", tc.expectedMiss[i].End, miss.End)
} }
} }
}) })
@ -1022,18 +1084,18 @@ func TestQueryRangeValueTypePromQL(t *testing.T) {
expectedQueryAndTimeRanges := []struct { expectedQueryAndTimeRanges := []struct {
query string query string
ranges []missInterval ranges []querycache.MissInterval
}{ }{
{ {
query: "signoz_calls_total", query: "signoz_calls_total",
ranges: []missInterval{ ranges: []querycache.MissInterval{
{start: 1675115596722, end: 1675115596722 + 120*60*1000}, {Start: 1675115596722, End: 1675115596722 + 120*60*1000},
}, },
}, },
{ {
query: "signoz_latency_bucket", query: "signoz_latency_bucket",
ranges: []missInterval{ ranges: []querycache.MissInterval{
{start: 1675115596722 + 60*60*1000, end: 1675115596722 + 180*60*1000}, {Start: 1675115596722 + 60*60*1000, End: 1675115596722 + 180*60*1000},
}, },
}, },
} }
@ -1054,10 +1116,10 @@ func TestQueryRangeValueTypePromQL(t *testing.T) {
if len(q.TimeRanges()[i]) != 2 { if len(q.TimeRanges()[i]) != 2 {
t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i]) t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i])
} }
if q.TimeRanges()[i][0] != int(expectedQueryAndTimeRanges[i].ranges[0].start) { if q.TimeRanges()[i][0] != int(expectedQueryAndTimeRanges[i].ranges[0].Start) {
t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i]) t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i])
} }
if q.TimeRanges()[i][1] != int(expectedQueryAndTimeRanges[i].ranges[0].end) { if q.TimeRanges()[i][1] != int(expectedQueryAndTimeRanges[i].ranges[0].End) {
t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i]) t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i])
} }
} }

View File

@ -2,20 +2,19 @@ package v2
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"strings" "strings"
"sync" "sync"
"time"
logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3"
logsV4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4" logsV4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4"
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4" metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
"go.signoz.io/signoz/pkg/query-service/cache/status" "go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/querycache"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -108,7 +107,8 @@ func (q *querier) runBuilderQuery(
if builderQuery.DataSource == v3.DataSourceLogs { if builderQuery.DataSource == v3.DataSourceLogs {
var query string var query string
var err error var err error
if _, ok := cacheKeys[queryName]; !ok { if _, ok := cacheKeys[queryName]; !ok || params.NoCache {
zap.L().Info("skipping cache for logs query", zap.String("queryName", queryName), zap.Int64("start", params.Start), zap.Int64("end", params.End), zap.Int64("step", params.Step), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName]))
query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, start, end, builderQuery, params, preferRPM) query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, start, end, builderQuery, params, preferRPM)
if err != nil { if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
@ -118,21 +118,11 @@ func (q *querier) runBuilderQuery(
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series} ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series}
return return
} }
cacheKey := cacheKeys[queryName] misses := q.queryCache.FindMissingTimeRanges(start, end, builderQuery.StepInterval, cacheKeys[queryName])
var cachedData []byte zap.L().Info("cache misses for logs query", zap.Any("misses", misses))
if !params.NoCache && q.cache != nil { missedSeries := make([]querycache.CachedSeriesData, 0)
var retrieveStatus status.RetrieveStatus
data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true)
zap.L().Info("cache retrieve status", zap.String("status", retrieveStatus.String()))
if err == nil {
cachedData = data
}
}
misses, replaceCachedData := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses { for _, miss := range misses {
query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, miss.start, miss.end, builderQuery, params, preferRPM) query, err = prepareLogsQuery(ctx, q.UseLogsNewSchema, miss.Start, miss.End, builderQuery, params, preferRPM)
if err != nil { if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return return
@ -147,43 +137,20 @@ func (q *querier) runBuilderQuery(
} }
return return
} }
missedSeries = append(missedSeries, series...) missedSeries = append(missedSeries, querycache.CachedSeriesData{
} Data: series,
if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { Start: miss.Start,
zap.L().Error("error unmarshalling cached data", zap.Error(err)) End: miss.End,
} })
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
if replaceCachedData {
mergedSeries = missedSeries
}
var mergedSeriesData []byte
var marshallingErr error
missedSeriesLen := len(missedSeries)
if missedSeriesLen > 0 && !params.NoCache && q.cache != nil {
// caching the data
mergedSeriesData, marshallingErr = json.Marshal(mergedSeries)
if marshallingErr != nil {
zap.L().Error("error marshalling merged series", zap.Error(marshallingErr))
}
} }
mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKeys[queryName], missedSeries)
// response doesn't need everything resultSeries := common.GetSeriesFromCachedData(mergedSeries, start, end)
filterCachedPoints(mergedSeries, start, end)
ch <- channelResult{ ch <- channelResult{
Err: nil, Err: nil,
Name: queryName, Name: queryName,
Series: mergedSeries, Series: resultSeries,
}
// Cache the seriesList for future queries
if missedSeriesLen > 0 && !params.NoCache && q.cache != nil && marshallingErr == nil {
// caching the data
err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
if err != nil {
zap.L().Error("error storing merged series", zap.Error(err))
return
}
} }
return return
@ -240,7 +207,8 @@ func (q *querier) runBuilderQuery(
// What is happening here? // What is happening here?
// We are only caching the graph panel queries. A non-existant cache key means that the query is not cached. // We are only caching the graph panel queries. A non-existant cache key means that the query is not cached.
// If the query is not cached, we execute the query and return the result without caching it. // If the query is not cached, we execute the query and return the result without caching it.
if _, ok := cacheKeys[queryName]; !ok { if _, ok := cacheKeys[queryName]; !ok || params.NoCache {
zap.L().Info("skipping cache for metrics query", zap.String("queryName", queryName), zap.Int64("start", params.Start), zap.Int64("end", params.End), zap.Int64("step", params.Step), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName]))
query, err := metricsV4.PrepareMetricQuery(start, end, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, metricsV3.Options{PreferRPM: preferRPM}) query, err := metricsV4.PrepareMetricQuery(start, end, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, metricsV3.Options{PreferRPM: preferRPM})
if err != nil { if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
@ -251,23 +219,13 @@ func (q *querier) runBuilderQuery(
return return
} }
cacheKey := cacheKeys[queryName] misses := q.queryCache.FindMissingTimeRanges(start, end, builderQuery.StepInterval, cacheKeys[queryName])
var cachedData []byte zap.L().Info("cache misses for metrics query", zap.Any("misses", misses))
if !params.NoCache && q.cache != nil { missedSeries := make([]querycache.CachedSeriesData, 0)
var retrieveStatus status.RetrieveStatus
data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true)
zap.L().Info("cache retrieve status", zap.String("status", retrieveStatus.String()))
if err == nil {
cachedData = data
}
}
misses, replaceCachedData := q.findMissingTimeRanges(start, end, builderQuery.StepInterval, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses { for _, miss := range misses {
query, err := metricsV4.PrepareMetricQuery( query, err := metricsV4.PrepareMetricQuery(
miss.start, miss.Start,
miss.end, miss.End,
params.CompositeQuery.QueryType, params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType, params.CompositeQuery.PanelType,
builderQuery, builderQuery,
@ -292,41 +250,19 @@ func (q *querier) runBuilderQuery(
} }
return return
} }
missedSeries = append(missedSeries, series...) missedSeries = append(missedSeries, querycache.CachedSeriesData{
} Data: series,
if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { Start: miss.Start,
zap.L().Error("error unmarshalling cached data", zap.Error(err)) End: miss.End,
} })
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
if replaceCachedData {
mergedSeries = missedSeries
} }
mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKeys[queryName], missedSeries)
var mergedSeriesData []byte resultSeries := common.GetSeriesFromCachedData(mergedSeries, start, end)
var marshallingErr error
missedSeriesLen := len(missedSeries)
if missedSeriesLen > 0 && !params.NoCache && q.cache != nil {
// caching the data
mergedSeriesData, marshallingErr = json.Marshal(mergedSeries)
if marshallingErr != nil {
zap.S().Error("error marshalling merged series", zap.Error(marshallingErr))
}
}
// response doesn't need everything
filterCachedPoints(mergedSeries, start, end)
ch <- channelResult{ ch <- channelResult{
Err: nil, Err: nil,
Name: queryName, Name: queryName,
Series: mergedSeries, Series: resultSeries,
}
// Cache the seriesList for future queries
if missedSeriesLen > 0 && !params.NoCache && q.cache != nil && marshallingErr == nil {
err := q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
if err != nil {
zap.L().Error("error storing merged series", zap.Error(err))
return
}
} }
} }

View File

@ -2,11 +2,7 @@ package v2
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"math"
"sort"
"strings"
"sync" "sync"
"time" "time"
@ -15,7 +11,9 @@ import (
metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4" metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
"go.signoz.io/signoz/pkg/query-service/common"
chErrors "go.signoz.io/signoz/pkg/query-service/errors" chErrors "go.signoz.io/signoz/pkg/query-service/errors"
"go.signoz.io/signoz/pkg/query-service/querycache"
"go.signoz.io/signoz/pkg/query-service/utils" "go.signoz.io/signoz/pkg/query-service/utils"
"go.signoz.io/signoz/pkg/query-service/cache" "go.signoz.io/signoz/pkg/query-service/cache"
@ -34,14 +32,11 @@ type channelResult struct {
Query string Query string
} }
type missInterval struct {
start, end int64 // in milliseconds
}
type querier struct { type querier struct {
cache cache.Cache cache cache.Cache
reader interfaces.Reader reader interfaces.Reader
keyGenerator cache.KeyGenerator keyGenerator cache.KeyGenerator
queryCache interfaces.QueryCache
fluxInterval time.Duration fluxInterval time.Duration
@ -79,8 +74,11 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
logsQueryBuilder = logsV4.PrepareLogsQuery logsQueryBuilder = logsV4.PrepareLogsQuery
} }
qc := querycache.NewQueryCache(querycache.WithCache(opts.Cache), querycache.WithFluxInterval(opts.FluxInterval))
return &querier{ return &querier{
cache: opts.Cache, cache: opts.Cache,
queryCache: qc,
reader: opts.Reader, reader: opts.Reader,
keyGenerator: opts.KeyGenerator, keyGenerator: opts.KeyGenerator,
fluxInterval: opts.FluxInterval, fluxInterval: opts.FluxInterval,
@ -157,167 +155,6 @@ func (q *querier) execPromQuery(ctx context.Context, params *model.QueryRangePar
return seriesList, nil return seriesList, nil
} }
// findMissingTimeRanges finds the missing time ranges in the seriesList
// and returns a list of miss structs, It takes the fluxInterval into
// account to find the missing time ranges.
//
// The [End - fluxInterval, End] is always added to the list of misses, because
// the data might still be in flux and not yet available in the database.
//
// replaceCacheData is used to indicate if the cache data should be replaced instead of merging
// with the new data
// TODO: Remove replaceCacheData with a better logic
func findMissingTimeRanges(start, end, step int64, seriesList []*v3.Series, fluxInterval time.Duration) (misses []missInterval, replaceCacheData bool) {
replaceCacheData = false
var cachedStart, cachedEnd int64
for idx := range seriesList {
series := seriesList[idx]
for pointIdx := range series.Points {
point := series.Points[pointIdx]
if cachedStart == 0 || point.Timestamp < cachedStart {
cachedStart = point.Timestamp
}
if cachedEnd == 0 || point.Timestamp > cachedEnd {
cachedEnd = point.Timestamp
}
}
}
// time.Now is used because here we are considering the case where data might not
// be fully ingested for last (fluxInterval) minutes
endMillis := time.Now().UnixMilli()
adjustStep := int64(math.Min(float64(step), 60))
roundedMillis := endMillis - (endMillis % (adjustStep * 1000))
// Exclude the flux interval from the cached end time
cachedEnd = int64(
math.Min(
float64(cachedEnd),
float64(roundedMillis-fluxInterval.Milliseconds()),
),
)
// There are five cases to consider
// 1. Cached time range is a subset of the requested time range
// 2. Cached time range is a superset of the requested time range
// 3. Cached time range is a left overlap of the requested time range
// 4. Cached time range is a right overlap of the requested time range
// 5. Cached time range is a disjoint of the requested time range
if cachedStart >= start && cachedEnd <= end {
// Case 1: Cached time range is a subset of the requested time range
// Add misses for the left and right sides of the cached time range
misses = append(misses, missInterval{start: start, end: cachedStart - 1})
misses = append(misses, missInterval{start: cachedEnd + 1, end: end})
} else if cachedStart <= start && cachedEnd >= end {
// Case 2: Cached time range is a superset of the requested time range
// No misses
} else if cachedStart <= start && cachedEnd >= start {
// Case 3: Cached time range is a left overlap of the requested time range
// Add a miss for the left side of the cached time range
misses = append(misses, missInterval{start: cachedEnd + 1, end: end})
} else if cachedStart <= end && cachedEnd >= end {
// Case 4: Cached time range is a right overlap of the requested time range
// Add a miss for the right side of the cached time range
misses = append(misses, missInterval{start: start, end: cachedStart - 1})
} else {
// Case 5: Cached time range is a disjoint of the requested time range
// Add a miss for the entire requested time range
misses = append(misses, missInterval{start: start, end: end})
replaceCacheData = true
}
// remove the struts with start > end
var validMisses []missInterval
for idx := range misses {
miss := misses[idx]
if miss.start < miss.end {
validMisses = append(validMisses, miss)
}
}
return validMisses, replaceCacheData
}
// findMissingTimeRanges finds the missing time ranges in the cached data
// and returns them as a list of misses
func (q *querier) findMissingTimeRanges(start, end, step int64, cachedData []byte) (misses []missInterval, replaceCachedData bool) {
var cachedSeriesList []*v3.Series
if err := json.Unmarshal(cachedData, &cachedSeriesList); err != nil {
// In case of error, we return the entire range as a miss
return []missInterval{{start: start, end: end}}, true
}
return findMissingTimeRanges(start, end, step, cachedSeriesList, q.fluxInterval)
}
// labelsToString converts the labels map to a string
// sorted by key so that the string is consistent
// across different runs
func labelsToString(labels map[string]string) string {
type label struct {
Key string
Value string
}
var labelsList []label
for k, v := range labels {
labelsList = append(labelsList, label{Key: k, Value: v})
}
sort.Slice(labelsList, func(i, j int) bool {
return labelsList[i].Key < labelsList[j].Key
})
labelKVs := make([]string, len(labelsList))
for idx := range labelsList {
labelKVs[idx] = labelsList[idx].Key + "=" + labelsList[idx].Value
}
return fmt.Sprintf("{%s}", strings.Join(labelKVs, ","))
}
// filterCachedPoints filters the points in the series list
// that are outside the start and end time range
// and returns the filtered series list
// TODO(srikanthccv): is this really needed?
func filterCachedPoints(cachedSeries []*v3.Series, start, end int64) {
for _, c := range cachedSeries {
points := []v3.Point{}
for _, p := range c.Points {
if (p.Timestamp < start || p.Timestamp > end) && p.Timestamp != 0 {
continue
}
points = append(points, p)
}
c.Points = points
}
}
// mergeSerieses merges the cached series and the missed series
// and returns the merged series list
func mergeSerieses(cachedSeries, missedSeries []*v3.Series) []*v3.Series {
// Merge the missed series with the cached series by timestamp
mergedSeries := make([]*v3.Series, 0)
seriesesByLabels := make(map[string]*v3.Series)
for idx := range cachedSeries {
series := cachedSeries[idx]
seriesesByLabels[labelsToString(series.Labels)] = series
}
for idx := range missedSeries {
series := missedSeries[idx]
if _, ok := seriesesByLabels[labelsToString(series.Labels)]; !ok {
seriesesByLabels[labelsToString(series.Labels)] = series
continue
}
seriesesByLabels[labelsToString(series.Labels)].Points = append(seriesesByLabels[labelsToString(series.Labels)].Points, series.Points...)
}
// Sort the points in each series by timestamp
// and remove duplicate points
for idx := range seriesesByLabels {
series := seriesesByLabels[idx]
series.SortPoints()
series.RemoveDuplicatePoints()
mergedSeries = append(mergedSeries, series)
}
return mergedSeries
}
func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) { func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) {
cacheKeys := q.keyGenerator.GenerateKeys(params) cacheKeys := q.keyGenerator.GenerateKeys(params)
@ -372,50 +209,33 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam
go func(queryName string, promQuery *v3.PromQuery) { go func(queryName string, promQuery *v3.PromQuery) {
defer wg.Done() defer wg.Done()
cacheKey, ok := cacheKeys[queryName] cacheKey, ok := cacheKeys[queryName]
var cachedData []byte
// Ensure NoCache is not set and cache is not nil if !ok || params.NoCache {
if !params.NoCache && q.cache != nil && ok { zap.L().Info("skipping cache for metrics prom query", zap.String("queryName", queryName), zap.Int64("start", params.Start), zap.Int64("end", params.End), zap.Int64("step", params.Step), zap.Bool("noCache", params.NoCache), zap.String("cacheKey", cacheKeys[queryName]))
data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true) query := metricsV4.BuildPromQuery(promQuery, params.Step, params.Start, params.End)
zap.L().Info("cache retrieve status", zap.String("status", retrieveStatus.String())) series, err := q.execPromQuery(ctx, query)
if err == nil { channelResults <- channelResult{Err: err, Name: queryName, Query: query.Query, Series: series}
cachedData = data return
}
} }
misses, replaceCachedData := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData) misses := q.queryCache.FindMissingTimeRanges(params.Start, params.End, params.Step, cacheKey)
missedSeries := make([]*v3.Series, 0) zap.L().Info("cache misses for metrics prom query", zap.Any("misses", misses))
cachedSeries := make([]*v3.Series, 0) missedSeries := make([]querycache.CachedSeriesData, 0)
for _, miss := range misses { for _, miss := range misses {
query := metricsV4.BuildPromQuery(promQuery, params.Step, miss.start, miss.end) query := metricsV4.BuildPromQuery(promQuery, params.Step, miss.Start, miss.End)
series, err := q.execPromQuery(ctx, query) series, err := q.execPromQuery(ctx, query)
if err != nil { if err != nil {
channelResults <- channelResult{Err: err, Name: queryName, Query: query.Query, Series: nil} channelResults <- channelResult{Err: err, Name: queryName, Query: query.Query, Series: nil}
return return
} }
missedSeries = append(missedSeries, series...) missedSeries = append(missedSeries, querycache.CachedSeriesData{
} Data: series,
if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { Start: miss.Start,
// ideally we should not be getting an error here End: miss.End,
zap.L().Error("error unmarshalling cached data", zap.Error(err)) })
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
if replaceCachedData {
mergedSeries = missedSeries
}
channelResults <- channelResult{Err: nil, Name: queryName, Query: promQuery.Query, Series: mergedSeries}
// Cache the seriesList for future queries
if len(missedSeries) > 0 && !params.NoCache && q.cache != nil && ok {
mergedSeriesData, err := json.Marshal(mergedSeries)
if err != nil {
zap.L().Error("error marshalling merged series", zap.Error(err))
return
}
err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
if err != nil {
zap.L().Error("error storing merged series", zap.Error(err))
return
}
} }
mergedSeries := q.queryCache.MergeWithCachedSeriesData(cacheKey, missedSeries)
resultSeries := common.GetSeriesFromCachedData(mergedSeries, params.Start, params.End)
channelResults <- channelResult{Err: nil, Name: queryName, Query: promQuery.Query, Series: resultSeries}
}(queryName, promQuery) }(queryName, promQuery)
} }
wg.Wait() wg.Wait()

View File

@ -2,7 +2,9 @@ package v2
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"math"
"strings" "strings"
"testing" "testing"
"time" "time"
@ -11,9 +13,34 @@ import (
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
"go.signoz.io/signoz/pkg/query-service/cache/inmemory" "go.signoz.io/signoz/pkg/query-service/cache/inmemory"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/querycache"
) )
func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { func minTimestamp(series []*v3.Series) int64 {
min := int64(math.MaxInt64)
for _, series := range series {
for _, point := range series.Points {
if point.Timestamp < min {
min = point.Timestamp
}
}
}
return min
}
func maxTimestamp(series []*v3.Series) int64 {
max := int64(math.MinInt64)
for _, series := range series {
for _, point := range series.Points {
if point.Timestamp > max {
max = point.Timestamp
}
}
}
return max
}
func TestV2FindMissingTimeRangesZeroFreshNess(t *testing.T) {
// There are five scenarios: // There are five scenarios:
// 1. Cached time range is a subset of the requested time range // 1. Cached time range is a subset of the requested time range
// 2. Cached time range is a superset of the requested time range // 2. Cached time range is a superset of the requested time range
@ -26,7 +53,7 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) {
requestedEnd int64 // in milliseconds requestedEnd int64 // in milliseconds
requestedStep int64 // in seconds requestedStep int64 // in seconds
cachedSeries []*v3.Series cachedSeries []*v3.Series
expectedMiss []missInterval expectedMiss []querycache.MissInterval
replaceCachedData bool replaceCachedData bool
}{ }{
{ {
@ -51,14 +78,14 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) {
}, },
}, },
}, },
expectedMiss: []missInterval{ expectedMiss: []querycache.MissInterval{
{ {
start: 1675115596722, Start: 1675115596722,
end: 1675115596722 + 60*60*1000 - 1, End: 1675115596722 + 60*60*1000,
}, },
{ {
start: 1675115596722 + 120*60*1000 + 1, Start: 1675115596722 + 120*60*1000,
end: 1675115596722 + 180*60*1000, End: 1675115596722 + 180*60*1000,
}, },
}, },
}, },
@ -92,7 +119,7 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) {
}, },
}, },
}, },
expectedMiss: []missInterval{}, expectedMiss: []querycache.MissInterval{},
}, },
{ {
name: "cached time range is a left overlap of the requested time range", name: "cached time range is a left overlap of the requested time range",
@ -120,10 +147,10 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) {
}, },
}, },
}, },
expectedMiss: []missInterval{ expectedMiss: []querycache.MissInterval{
{ {
start: 1675115596722 + 120*60*1000 + 1, Start: 1675115596722 + 120*60*1000,
end: 1675115596722 + 180*60*1000, End: 1675115596722 + 180*60*1000,
}, },
}, },
}, },
@ -153,10 +180,10 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) {
}, },
}, },
}, },
expectedMiss: []missInterval{ expectedMiss: []querycache.MissInterval{
{ {
start: 1675115596722, Start: 1675115596722,
end: 1675115596722 + 60*60*1000 - 1, End: 1675115596722 + 60*60*1000,
}, },
}, },
}, },
@ -186,31 +213,48 @@ func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) {
}, },
}, },
}, },
expectedMiss: []missInterval{ expectedMiss: []querycache.MissInterval{
{ {
start: 1675115596722, Start: 1675115596722,
end: 1675115596722 + 180*60*1000, End: 1675115596722 + 180*60*1000,
}, },
}, },
replaceCachedData: true, replaceCachedData: true,
}, },
} }
for _, tc := range testCases { c := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute})
qc := querycache.NewQueryCache(querycache.WithCache(c))
for idx, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
misses, replaceCachedData := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, 0*time.Minute) cacheKey := fmt.Sprintf("test-cache-key-%d", idx)
cachedData := &querycache.CachedSeriesData{
Start: minTimestamp(tc.cachedSeries),
End: maxTimestamp(tc.cachedSeries),
Data: tc.cachedSeries,
}
jsonData, err := json.Marshal([]*querycache.CachedSeriesData{cachedData})
if err != nil {
t.Errorf("error marshalling cached data: %v", err)
}
err = c.Store(cacheKey, jsonData, 5*time.Minute)
if err != nil {
t.Errorf("error storing cached data: %v", err)
}
misses := qc.FindMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, cacheKey)
if len(misses) != len(tc.expectedMiss) { if len(misses) != len(tc.expectedMiss) {
t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses)) t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses))
} }
if replaceCachedData != tc.replaceCachedData {
t.Errorf("expected replaceCachedData %t, got %t", tc.replaceCachedData, replaceCachedData)
}
for i, miss := range misses { for i, miss := range misses {
if miss.start != tc.expectedMiss[i].start { if miss.Start != tc.expectedMiss[i].Start {
t.Errorf("expected start %d, got %d", tc.expectedMiss[i].start, miss.start) t.Errorf("expected start %d, got %d", tc.expectedMiss[i].Start, miss.Start)
} }
if miss.end != tc.expectedMiss[i].end { if miss.End != tc.expectedMiss[i].End {
t.Errorf("expected end %d, got %d", tc.expectedMiss[i].end, miss.end) t.Errorf("expected end %d, got %d", tc.expectedMiss[i].End, miss.End)
} }
} }
}) })
@ -226,7 +270,7 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) {
requestedStep int64 requestedStep int64
cachedSeries []*v3.Series cachedSeries []*v3.Series
fluxInterval time.Duration fluxInterval time.Duration
expectedMiss []missInterval expectedMiss []querycache.MissInterval
}{ }{
{ {
name: "cached time range is a subset of the requested time range", name: "cached time range is a subset of the requested time range",
@ -251,14 +295,14 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) {
}, },
}, },
fluxInterval: 5 * time.Minute, fluxInterval: 5 * time.Minute,
expectedMiss: []missInterval{ expectedMiss: []querycache.MissInterval{
{ {
start: 1675115596722, Start: 1675115596722,
end: 1675115596722 + 60*60*1000 - 1, End: 1675115596722 + 60*60*1000,
}, },
{ {
start: 1675115596722 + 120*60*1000 + 1, Start: 1675115596722 + 120*60*1000,
end: 1675115596722 + 180*60*1000, End: 1675115596722 + 180*60*1000,
}, },
}, },
}, },
@ -293,7 +337,7 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) {
}, },
}, },
fluxInterval: 5 * time.Minute, fluxInterval: 5 * time.Minute,
expectedMiss: []missInterval{}, expectedMiss: []querycache.MissInterval{},
}, },
{ {
name: "cache time range is a left overlap of the requested time range", name: "cache time range is a left overlap of the requested time range",
@ -322,10 +366,10 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) {
}, },
}, },
fluxInterval: 5 * time.Minute, fluxInterval: 5 * time.Minute,
expectedMiss: []missInterval{ expectedMiss: []querycache.MissInterval{
{ {
start: 1675115596722 + 120*60*1000 + 1, Start: 1675115596722 + 120*60*1000,
end: 1675115596722 + 180*60*1000, End: 1675115596722 + 180*60*1000,
}, },
}, },
}, },
@ -356,10 +400,10 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) {
}, },
}, },
fluxInterval: 5 * time.Minute, fluxInterval: 5 * time.Minute,
expectedMiss: []missInterval{ expectedMiss: []querycache.MissInterval{
{ {
start: 1675115596722, Start: 1675115596722,
end: 1675115596722 + 60*60*1000 - 1, End: 1675115596722 + 60*60*1000,
}, },
}, },
}, },
@ -390,27 +434,47 @@ func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) {
}, },
}, },
fluxInterval: 5 * time.Minute, fluxInterval: 5 * time.Minute,
expectedMiss: []missInterval{ expectedMiss: []querycache.MissInterval{
{ {
start: 1675115596722, Start: 1675115596722,
end: 1675115596722 + 180*60*1000, End: 1675115596722 + 180*60*1000,
}, },
}, },
}, },
} }
for _, tc := range testCases { c := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute})
qc := querycache.NewQueryCache(querycache.WithCache(c))
for idx, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
misses, _ := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, tc.fluxInterval) cacheKey := fmt.Sprintf("test-cache-key-%d", idx)
cachedData := &querycache.CachedSeriesData{
Start: minTimestamp(tc.cachedSeries),
End: maxTimestamp(tc.cachedSeries),
Data: tc.cachedSeries,
}
jsonData, err := json.Marshal([]*querycache.CachedSeriesData{cachedData})
if err != nil {
t.Errorf("error marshalling cached data: %v", err)
return
}
err = c.Store(cacheKey, jsonData, 5*time.Minute)
if err != nil {
t.Errorf("error storing cached data: %v", err)
return
}
misses := qc.FindMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, cacheKey)
if len(misses) != len(tc.expectedMiss) { if len(misses) != len(tc.expectedMiss) {
t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses)) t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses))
} }
for i, miss := range misses { for i, miss := range misses {
if miss.start != tc.expectedMiss[i].start { if miss.Start != tc.expectedMiss[i].Start {
t.Errorf("expected start %d, got %d", tc.expectedMiss[i].start, miss.start) t.Errorf("expected start %d, got %d", tc.expectedMiss[i].Start, miss.Start)
} }
if miss.end != tc.expectedMiss[i].end { if miss.End != tc.expectedMiss[i].End {
t.Errorf("expected end %d, got %d", tc.expectedMiss[i].end, miss.end) t.Errorf("expected end %d, got %d", tc.expectedMiss[i].End, miss.End)
} }
} }
}) })
@ -1074,18 +1138,18 @@ func TestV2QueryRangeValueTypePromQL(t *testing.T) {
expectedQueryAndTimeRanges := []struct { expectedQueryAndTimeRanges := []struct {
query string query string
ranges []missInterval ranges []querycache.MissInterval
}{ }{
{ {
query: "signoz_calls_total", query: "signoz_calls_total",
ranges: []missInterval{ ranges: []querycache.MissInterval{
{start: 1675115596722, end: 1675115596722 + 120*60*1000}, {Start: 1675115596722, End: 1675115596722 + 120*60*1000},
}, },
}, },
{ {
query: "signoz_latency_bucket", query: "signoz_latency_bucket",
ranges: []missInterval{ ranges: []querycache.MissInterval{
{start: 1675115596722 + 60*60*1000, end: 1675115596722 + 180*60*1000}, {Start: 1675115596722 + 60*60*1000, End: 1675115596722 + 180*60*1000},
}, },
}, },
} }
@ -1106,10 +1170,10 @@ func TestV2QueryRangeValueTypePromQL(t *testing.T) {
if len(q.TimeRanges()[i]) != 2 { if len(q.TimeRanges()[i]) != 2 {
t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i]) t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i])
} }
if q.TimeRanges()[i][0] != int(expectedQueryAndTimeRanges[i].ranges[0].start) { if q.TimeRanges()[i][0] != int(expectedQueryAndTimeRanges[i].ranges[0].Start) {
t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i]) t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i])
} }
if q.TimeRanges()[i][1] != int(expectedQueryAndTimeRanges[i].ranges[0].end) { if q.TimeRanges()[i][1] != int(expectedQueryAndTimeRanges[i].ranges[0].End) {
t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i]) t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i])
} }
} }

View File

@ -6,6 +6,8 @@ import (
"go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/querycache"
"go.signoz.io/signoz/pkg/query-service/utils/labels"
) )
func AdjustedMetricTimeRange(start, end, step int64, mq v3.BuilderQuery) (int64, int64) { func AdjustedMetricTimeRange(start, end, step int64, mq v3.BuilderQuery) (int64, int64) {
@ -70,3 +72,35 @@ func LCMList(nums []int64) int64 {
} }
return result return result
} }
func GetSeriesFromCachedData(data []querycache.CachedSeriesData, start, end int64) []*v3.Series {
series := make(map[uint64]*v3.Series)
for _, cachedData := range data {
for _, data := range cachedData.Data {
h := labels.FromMap(data.Labels).Hash()
if _, ok := series[h]; !ok {
series[h] = &v3.Series{
Labels: data.Labels,
LabelsArray: data.LabelsArray,
Points: make([]v3.Point, 0),
}
}
for _, point := range data.Points {
if point.Timestamp >= start && point.Timestamp <= end {
series[h].Points = append(series[h].Points, point)
}
}
}
}
newSeries := make([]*v3.Series, 0, len(series))
for _, s := range series {
s.SortPoints()
s.RemoveDuplicatePoints()
newSeries = append(newSeries, s)
}
return newSeries
}

View File

@ -10,6 +10,7 @@ import (
"github.com/prometheus/prometheus/util/stats" "github.com/prometheus/prometheus/util/stats"
"go.signoz.io/signoz/pkg/query-service/model" "go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/querycache"
) )
type Reader interface { type Reader interface {
@ -121,3 +122,8 @@ type Querier interface {
QueriesExecuted() []string QueriesExecuted() []string
TimeRanges() [][]int TimeRanges() [][]int
} }
type QueryCache interface {
FindMissingTimeRanges(start, end int64, step int64, cacheKey string) []querycache.MissInterval
MergeWithCachedSeriesData(cacheKey string, newData []querycache.CachedSeriesData) []querycache.CachedSeriesData
}

View File

@ -0,0 +1,225 @@
package querycache
import (
"encoding/json"
"math"
"sort"
"time"
"go.signoz.io/signoz/pkg/query-service/cache"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/utils/labels"
"go.uber.org/zap"
)
type queryCache struct {
cache cache.Cache
fluxInterval time.Duration
}
type MissInterval struct {
Start, End int64 // in milliseconds
}
type CachedSeriesData struct {
Start int64 `json:"start"`
End int64 `json:"end"`
Data []*v3.Series `json:"data"`
}
type QueryCacheOption func(q *queryCache)
func NewQueryCache(opts ...QueryCacheOption) *queryCache {
q := &queryCache{}
for _, opt := range opts {
opt(q)
}
return q
}
func WithCache(cache cache.Cache) QueryCacheOption {
return func(q *queryCache) {
q.cache = cache
}
}
func WithFluxInterval(fluxInterval time.Duration) QueryCacheOption {
return func(q *queryCache) {
q.fluxInterval = fluxInterval
}
}
func (q *queryCache) FindMissingTimeRanges(start, end, step int64, cacheKey string) []MissInterval {
if q.cache == nil || cacheKey == "" {
return []MissInterval{{Start: start, End: end}}
}
cachedSeriesDataList := q.getCachedSeriesData(cacheKey)
// Sort the cached data by start time
sort.Slice(cachedSeriesDataList, func(i, j int) bool {
return cachedSeriesDataList[i].Start < cachedSeriesDataList[j].Start
})
zap.L().Info("Number of non-overlapping cached series data", zap.Int("count", len(cachedSeriesDataList)))
// Exclude the flux interval from the cached end time
// Why do we use `time.Now()` here?
// When querying for a range [start, now())
// we don't want to use the cached data inside the flux interval period
// because the data in the flux interval period might not be fully ingested
// and should not be used for caching.
// This is not an issue if the end time is before now() - fluxInterval
endMillis := time.Now().UnixMilli()
adjustStep := int64(math.Min(float64(step), 60))
roundedMillis := endMillis - (endMillis % (adjustStep * 1000))
if len(cachedSeriesDataList) > 0 {
lastCachedData := cachedSeriesDataList[len(cachedSeriesDataList)-1]
lastCachedData.End = int64(
math.Min(
float64(lastCachedData.End),
float64(roundedMillis-q.fluxInterval.Milliseconds()),
),
)
}
var missingRanges []MissInterval
currentTime := start
for _, data := range cachedSeriesDataList {
// Ignore cached data that ends before the start time
if data.End <= start {
continue
}
// Stop processing if we've reached the end time
if data.Start >= end {
break
}
// Add missing range if there's a gap
if currentTime < data.Start {
missingRanges = append(missingRanges, MissInterval{Start: currentTime, End: min(data.Start, end)})
}
// Update currentTime, but don't go past the end time
currentTime = max(currentTime, min(data.End, end))
}
// Add final missing range if necessary
if currentTime < end {
missingRanges = append(missingRanges, MissInterval{Start: currentTime, End: end})
}
return missingRanges
}
func (q *queryCache) getCachedSeriesData(cacheKey string) []*CachedSeriesData {
cachedData, _, _ := q.cache.Retrieve(cacheKey, true)
var cachedSeriesDataList []*CachedSeriesData
if err := json.Unmarshal(cachedData, &cachedSeriesDataList); err != nil {
return nil
}
return cachedSeriesDataList
}
func (q *queryCache) mergeSeries(cachedSeries, missedSeries []*v3.Series) []*v3.Series {
// Merge the missed series with the cached series by timestamp
mergedSeries := make([]*v3.Series, 0)
seriesesByLabels := make(map[uint64]*v3.Series)
for idx := range cachedSeries {
series := cachedSeries[idx]
seriesesByLabels[labels.FromMap(series.Labels).Hash()] = series
}
for idx := range missedSeries {
series := missedSeries[idx]
h := labels.FromMap(series.Labels).Hash()
if _, ok := seriesesByLabels[h]; !ok {
seriesesByLabels[h] = series
continue
}
seriesesByLabels[h].Points = append(seriesesByLabels[h].Points, series.Points...)
}
// Sort the points in each series by timestamp
for idx := range seriesesByLabels {
series := seriesesByLabels[idx]
series.SortPoints()
series.RemoveDuplicatePoints()
mergedSeries = append(mergedSeries, series)
}
return mergedSeries
}
func (q *queryCache) storeMergedData(cacheKey string, mergedData []CachedSeriesData) {
mergedDataJSON, err := json.Marshal(mergedData)
if err != nil {
zap.L().Error("error marshalling merged data", zap.Error(err))
return
}
err = q.cache.Store(cacheKey, mergedDataJSON, 0)
if err != nil {
zap.L().Error("error storing merged data", zap.Error(err))
}
}
func (q *queryCache) MergeWithCachedSeriesData(cacheKey string, newData []CachedSeriesData) []CachedSeriesData {
if q.cache == nil {
return newData
}
cachedData, _, _ := q.cache.Retrieve(cacheKey, true)
var existingData []CachedSeriesData
if err := json.Unmarshal(cachedData, &existingData); err != nil {
// In case of error, we return the entire range as a miss
q.storeMergedData(cacheKey, newData)
return newData
}
allData := append(existingData, newData...)
sort.Slice(allData, func(i, j int) bool {
return allData[i].Start < allData[j].Start
})
var mergedData []CachedSeriesData
var current *CachedSeriesData
for _, data := range allData {
if current == nil {
current = &CachedSeriesData{
Start: data.Start,
End: data.End,
Data: data.Data,
}
continue
}
if data.Start <= current.End {
// Overlapping intervals, merge them
current.End = max(current.End, data.End)
current.Start = min(current.Start, data.Start)
// Merge the Data fields
current.Data = q.mergeSeries(current.Data, data.Data)
} else {
// No overlap, add current to mergedData
mergedData = append(mergedData, *current)
// Start new current
current = &CachedSeriesData{
Start: data.Start,
End: data.End,
Data: data.Data,
}
}
}
// After the loop, add the last current
if current != nil {
mergedData = append(mergedData, *current)
}
q.storeMergedData(cacheKey, mergedData)
return mergedData
}

View File

@ -0,0 +1,336 @@
package querycache_test
import (
"encoding/json"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.signoz.io/signoz/pkg/query-service/cache/inmemory"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/querycache"
)
func TestFindMissingTimeRanges(t *testing.T) {
// Initialize the mock cache
mockCache := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute})
// Create a queryCache instance with the mock cache and a fluxInterval
q := querycache.NewQueryCache(
querycache.WithCache(mockCache),
querycache.WithFluxInterval(0), // Set to zero for testing purposes
)
// Define the test cases
testCases := []struct {
name string
requestedStart int64 // in milliseconds
requestedEnd int64 // in milliseconds
step int64 // in seconds
cacheKey string
cachedData []querycache.CachedSeriesData
expectedMiss []querycache.MissInterval
}{
{
name: "Cached time range is a subset of the requested time range",
requestedStart: 1000,
requestedEnd: 5000,
step: 60,
cacheKey: "testKey1",
cachedData: []querycache.CachedSeriesData{
{
Start: 2000,
End: 3000,
Data: []*v3.Series{}, // Data can be empty for this test
},
},
expectedMiss: []querycache.MissInterval{
{Start: 1000, End: 2000},
{Start: 3000, End: 5000},
},
},
{
name: "Cached time range is a superset of the requested time range",
requestedStart: 2000,
requestedEnd: 3000,
step: 60,
cacheKey: "testKey2",
cachedData: []querycache.CachedSeriesData{
{
Start: 1000,
End: 4000,
Data: []*v3.Series{},
},
},
expectedMiss: nil, // No missing intervals
},
{
name: "Cached time range is a left overlap of the requested time range",
requestedStart: 2000,
requestedEnd: 4000,
step: 60,
cacheKey: "testKey3",
cachedData: []querycache.CachedSeriesData{
{
Start: 1000,
End: 2500,
Data: []*v3.Series{},
},
},
expectedMiss: []querycache.MissInterval{
{Start: 2500, End: 4000},
},
},
{
name: "Cached time range is a right overlap of the requested time range",
requestedStart: 2000,
requestedEnd: 4000,
step: 60,
cacheKey: "testKey4",
cachedData: []querycache.CachedSeriesData{
{
Start: 3500,
End: 5000,
Data: []*v3.Series{},
},
},
expectedMiss: []querycache.MissInterval{
{Start: 2000, End: 3500},
},
},
{
name: "Cached time range is disjoint from the requested time range",
requestedStart: 2000,
requestedEnd: 4000,
step: 60,
cacheKey: "testKey5",
cachedData: []querycache.CachedSeriesData{
{
Start: 5000,
End: 6000,
Data: []*v3.Series{},
},
},
expectedMiss: []querycache.MissInterval{
{Start: 2000, End: 4000},
},
},
// Additional test cases for non-overlapping cached data
{
name: "Multiple non-overlapping cached intervals within requested range",
requestedStart: 1000,
requestedEnd: 5000,
step: 60,
cacheKey: "testKey6",
cachedData: []querycache.CachedSeriesData{
{Start: 1100, End: 1200, Data: []*v3.Series{}},
{Start: 1300, End: 1400, Data: []*v3.Series{}},
{Start: 1500, End: 1600, Data: []*v3.Series{}},
},
expectedMiss: []querycache.MissInterval{
{Start: 1000, End: 1100},
{Start: 1200, End: 1300},
{Start: 1400, End: 1500},
{Start: 1600, End: 5000},
},
},
{
name: "Cached intervals covering some parts with gaps",
requestedStart: 1000,
requestedEnd: 2000,
step: 60,
cacheKey: "testKey7",
cachedData: []querycache.CachedSeriesData{
{Start: 1000, End: 1100, Data: []*v3.Series{}},
{Start: 1200, End: 1300, Data: []*v3.Series{}},
{Start: 1400, End: 1500, Data: []*v3.Series{}},
{Start: 1600, End: 1700, Data: []*v3.Series{}},
},
expectedMiss: []querycache.MissInterval{
{Start: 1100, End: 1200},
{Start: 1300, End: 1400},
{Start: 1500, End: 1600},
{Start: 1700, End: 2000},
},
},
{
name: "Non-overlapping cached intervals outside requested range",
requestedStart: 2000,
requestedEnd: 3000,
step: 60,
cacheKey: "testKey8",
cachedData: []querycache.CachedSeriesData{
{Start: 1000, End: 1500, Data: []*v3.Series{}},
{Start: 3500, End: 4000, Data: []*v3.Series{}},
},
expectedMiss: []querycache.MissInterval{
{Start: 2000, End: 3000},
},
},
{
name: "No cached data at all",
requestedStart: 1000,
requestedEnd: 2000,
step: 60,
cacheKey: "testKey10",
cachedData: nil,
expectedMiss: []querycache.MissInterval{
{Start: 1000, End: 2000},
},
},
{
name: "Cached intervals with overlapping and non-overlapping mix",
requestedStart: 1000,
requestedEnd: 5000,
step: 60,
cacheKey: "testKey11",
cachedData: []querycache.CachedSeriesData{
{Start: 1000, End: 2000, Data: []*v3.Series{}},
{Start: 1500, End: 2500, Data: []*v3.Series{}}, // Overlaps with previous
{Start: 3000, End: 3500, Data: []*v3.Series{}},
{Start: 4000, End: 4500, Data: []*v3.Series{}},
},
expectedMiss: []querycache.MissInterval{
{Start: 2500, End: 3000},
{Start: 3500, End: 4000},
{Start: 4500, End: 5000},
},
},
{
name: "Cached intervals covering the edges but missing middle",
requestedStart: 1000,
requestedEnd: 5000,
step: 60,
cacheKey: "testKey12",
cachedData: []querycache.CachedSeriesData{
{Start: 1000, End: 1500, Data: []*v3.Series{}},
{Start: 4500, End: 5000, Data: []*v3.Series{}},
},
expectedMiss: []querycache.MissInterval{
{Start: 1500, End: 4500},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Store the cached data in the mock cache
if len(tc.cachedData) > 0 {
cachedDataJSON, err := json.Marshal(tc.cachedData)
assert.NoError(t, err)
err = mockCache.Store(tc.cacheKey, cachedDataJSON, 0)
assert.NoError(t, err)
}
// Call FindMissingTimeRanges
missingRanges := q.FindMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.step, tc.cacheKey)
// Verify the missing ranges
assert.Equal(t, tc.expectedMiss, missingRanges)
})
}
}
func TestMergeWithCachedSeriesData(t *testing.T) {
// Initialize the mock cache
mockCache := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute})
// Create a queryCache instance with the mock cache and a fluxInterval
q := querycache.NewQueryCache(
querycache.WithCache(mockCache),
querycache.WithFluxInterval(0), // Set to zero for testing purposes
)
// Define test data
cacheKey := "mergeTestKey"
// Existing cached data
existingData := []querycache.CachedSeriesData{
{
Start: 1000,
End: 2000,
Data: []*v3.Series{
{
Labels: map[string]string{"metric": "cpu", "instance": "localhost"},
Points: []v3.Point{
{Timestamp: 1500, Value: 0.5},
},
},
},
},
}
// New data to merge
newData := []querycache.CachedSeriesData{
{
Start: 1500,
End: 2500,
Data: []*v3.Series{
{
Labels: map[string]string{"metric": "cpu", "instance": "localhost"},
Points: []v3.Point{
{Timestamp: 1750, Value: 0.6},
},
},
{
Labels: map[string]string{"metric": "memory", "instance": "localhost"},
Points: []v3.Point{
{Timestamp: 1800, Value: 0.7},
},
},
},
},
}
// Expected merged data
expectedMergedData := []querycache.CachedSeriesData{
{
Start: 1000,
End: 2500,
Data: []*v3.Series{
{
Labels: map[string]string{"metric": "cpu", "instance": "localhost"},
Points: []v3.Point{
{Timestamp: 1500, Value: 0.5},
{Timestamp: 1750, Value: 0.6},
},
},
{
Labels: map[string]string{"metric": "memory", "instance": "localhost"},
Points: []v3.Point{
{Timestamp: 1800, Value: 0.7},
},
},
},
},
}
// Store existing data in cache
cachedDataJSON, err := json.Marshal(existingData)
assert.NoError(t, err)
err = mockCache.Store(cacheKey, cachedDataJSON, 0)
assert.NoError(t, err)
// Call MergeWithCachedSeriesData
mergedData := q.MergeWithCachedSeriesData(cacheKey, newData)
// Verify the merged data
assert.Equal(t, len(expectedMergedData), len(mergedData))
for i, expected := range expectedMergedData {
actual := mergedData[i]
assert.Equal(t, expected.Start, actual.Start)
assert.Equal(t, expected.End, actual.End)
assert.Equal(t, len(expected.Data), len(actual.Data))
for j, expectedSeries := range expected.Data {
actualSeries := actual.Data[j]
assert.Equal(t, expectedSeries.Labels, actualSeries.Labels)
assert.Equal(t, len(expectedSeries.Points), len(actualSeries.Points))
for k, expectedPoint := range expectedSeries.Points {
actualPoint := actualSeries.Points[k]
assert.Equal(t, expectedPoint.Timestamp, actualPoint.Timestamp)
assert.Equal(t, expectedPoint.Value, actualPoint.Value)
}
}
}
}