feat: support cache in logs (#4516)

* feat: support cache in logs

* fix: revert fluxinterval changes

* feat: support for limit queries

* feat: support for formula

* fix: refractor code and don't return all points

* fix: add nil params check

* fix: error handling updated

* fix: start and end params fix
This commit is contained in:
Nityananda Gohain 2024-02-12 18:45:21 +05:30 committed by GitHub
parent ab4f6adb19
commit 0e331dd177
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 227 additions and 58 deletions

View File

@ -17,6 +17,61 @@ import (
"go.uber.org/zap"
)
func prepareLogsQuery(ctx context.Context,
start,
end int64,
builderQuery *v3.BuilderQuery,
params *v3.QueryRangeParamsV3,
preferRPM bool,
) (string, error) {
query := ""
if params == nil || builderQuery == nil {
return query, fmt.Errorf("params and builderQuery cannot be nil")
}
// for ts query with limit replace it as it is already formed
if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 {
limitQuery, err := logsV3.PrepareLogsQuery(
start,
end,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
logsV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: preferRPM},
)
if err != nil {
return query, err
}
placeholderQuery, err := logsV3.PrepareLogsQuery(
start,
end,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
logsV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: preferRPM},
)
if err != nil {
return query, err
}
query = strings.Replace(placeholderQuery, "#LIMIT_PLACEHOLDER", limitQuery, 1)
return query, err
}
query, err := logsV3.PrepareLogsQuery(
start,
end,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
logsV3.Options{PreferRPM: preferRPM},
)
if err != nil {
return query, err
}
return query, err
}
func (q *querier) runBuilderQuery(
ctx context.Context,
builderQuery *v3.BuilderQuery,
@ -35,59 +90,88 @@ func (q *querier) runBuilderQuery(
preferRPM = q.featureLookUp.CheckFeature(constants.PreferRPM) == nil
}
// TODO: handle other data sources
if builderQuery.DataSource == v3.DataSourceLogs {
var query string
var err error
// for ts query with limit replace it as it is already formed
if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 {
limitQuery, err := logsV3.PrepareLogsQuery(
params.Start,
params.End,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
logsV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: preferRPM},
)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil}
return
}
placeholderQuery, err := logsV3.PrepareLogsQuery(
params.Start,
params.End,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
logsV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: preferRPM},
)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: placeholderQuery, Series: nil}
return
}
query = strings.Replace(placeholderQuery, "#LIMIT_PLACEHOLDER", limitQuery, 1)
} else {
query, err = logsV3.PrepareLogsQuery(
params.Start,
params.End,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
logsV3.Options{PreferRPM: preferRPM},
)
if _, ok := cacheKeys[queryName]; !ok {
query, err = prepareLogsQuery(ctx, params.Start, params.End, builderQuery, params, preferRPM)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return
}
}
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
series, err := q.execClickHouseQuery(ctx, query)
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series}
return
}
series, err := q.execClickHouseQuery(ctx, query)
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series}
cacheKey := cacheKeys[queryName]
var cachedData []byte
if !params.NoCache && q.cache != nil {
var retrieveStatus status.RetrieveStatus
data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true)
zap.S().Infof("cache retrieve status: %s", retrieveStatus.String())
if err == nil {
cachedData = data
}
}
misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {
query, err = prepareLogsQuery(ctx, miss.start, miss.end, builderQuery, params, preferRPM)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return
}
series, err := q.execClickHouseQuery(ctx, query)
if err != nil {
ch <- channelResult{
Err: err,
Name: queryName,
Query: query,
Series: nil,
}
return
}
missedSeries = append(missedSeries, series...)
}
if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil {
zap.S().Error("error unmarshalling cached data", zap.Error(err))
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
var mergedSeriesData []byte
var marshallingErr error
missedSeriesLen := len(missedSeries)
if missedSeriesLen > 0 && !params.NoCache && q.cache != nil {
// caching the data
mergedSeriesData, marshallingErr = json.Marshal(mergedSeries)
if marshallingErr != nil {
zap.S().Error("error marshalling merged series", zap.Error(marshallingErr))
}
}
// response doesn't need everything
filterCachedPoints(mergedSeries, params.Start, params.End)
ch <- channelResult{
Err: nil,
Name: queryName,
Series: mergedSeries,
}
// Cache the seriesList for future queries
if missedSeriesLen > 0 && !params.NoCache && q.cache != nil && marshallingErr == nil {
// caching the data
err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
if err != nil {
zap.S().Error("error storing merged series", zap.Error(err))
return
}
}
return
}
if builderQuery.DataSource == v3.DataSourceTraces {
@ -202,20 +286,28 @@ func (q *querier) runBuilderQuery(
zap.S().Error("error unmarshalling cached data", zap.Error(err))
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
var mergedSeriesData []byte
var marshallingErr error
missedSeriesLen := len(missedSeries)
if missedSeriesLen > 0 && !params.NoCache && q.cache != nil {
// caching the data
mergedSeriesData, marshallingErr = json.Marshal(mergedSeries)
if marshallingErr != nil {
zap.S().Error("error marshalling merged series", zap.Error(marshallingErr))
}
}
// response doesn't need everything
filterCachedPoints(mergedSeries, params.Start, params.End)
ch <- channelResult{
Err: nil,
Name: queryName,
Series: mergedSeries,
}
// Cache the seriesList for future queries
if len(missedSeries) > 0 && !params.NoCache && q.cache != nil {
mergedSeriesData, err := json.Marshal(mergedSeries)
if err != nil {
zap.S().Error("error marshalling merged series", zap.Error(err))
return
}
err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
if missedSeriesLen > 0 && !params.NoCache && q.cache != nil && marshallingErr == nil {
err := q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
if err != nil {
zap.S().Error("error storing merged series", zap.Error(err))
return
@ -284,18 +376,27 @@ func (q *querier) runBuilderExpression(
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
var mergedSeriesData []byte
missedSeriesLen := len(missedSeries)
var marshallingErr error
if missedSeriesLen > 0 && !params.NoCache && q.cache != nil {
// caching the data
mergedSeriesData, marshallingErr = json.Marshal(mergedSeries)
if marshallingErr != nil {
zap.S().Error("error marshalling merged series", zap.Error(marshallingErr))
}
}
// response doesn't need everything
filterCachedPoints(mergedSeries, params.Start, params.End)
ch <- channelResult{
Err: nil,
Name: queryName,
Series: mergedSeries,
}
// Cache the seriesList for future queries
if len(missedSeries) > 0 && !params.NoCache && q.cache != nil {
mergedSeriesData, err := json.Marshal(mergedSeries)
if err != nil {
zap.S().Error("error marshalling merged series", zap.Error(err))
return
}
if len(missedSeries) > 0 && !params.NoCache && q.cache != nil && marshallingErr == nil {
err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
if err != nil {
zap.S().Error("error storing merged series", zap.Error(err))

View File

@ -160,6 +160,8 @@ func findMissingTimeRanges(start, end, step int64, seriesList []*v3.Series, flux
}
}
// time.Now is used because here we are considering the case where data might not
// be fully ingested for last (fluxInterval) minutes
endMillis := time.Now().UnixMilli()
adjustStep := int64(math.Min(float64(step), 60))
roundedMillis := endMillis - (endMillis % (adjustStep * 1000))
@ -241,6 +243,19 @@ func labelsToString(labels map[string]string) string {
return fmt.Sprintf("{%s}", strings.Join(labelKVs, ","))
}
func filterCachedPoints(cachedSeries []*v3.Series, start, end int64) {
for _, c := range cachedSeries {
points := []v3.Point{}
for _, p := range c.Points {
if p.Timestamp < start || p.Timestamp > end {
continue
}
points = append(points, p)
}
c.Points = points
}
}
func mergeSerieses(cachedSeries, missedSeries []*v3.Series) []*v3.Series {
// Merge the missed series with the cached series by timestamp
mergedSeries := make([]*v3.Series, 0)

View File

@ -302,6 +302,16 @@ func isMetricExpression(expression *govaluate.EvaluableExpression, params *v3.Qu
return true
}
func isLogExpression(expression *govaluate.EvaluableExpression, params *v3.QueryRangeParamsV3) bool {
variables := unique(expression.Vars())
for _, variable := range variables {
if params.CompositeQuery.BuilderQueries[variable].DataSource != v3.DataSourceLogs {
return false
}
}
return true
}
func (c *cacheKeyGenerator) GenerateKeys(params *v3.QueryRangeParamsV3) map[string]string {
keys := make(map[string]string)
@ -320,7 +330,46 @@ func (c *cacheKeyGenerator) GenerateKeys(params *v3.QueryRangeParamsV3) map[stri
// Build keys for each builder query
for queryName, query := range params.CompositeQuery.BuilderQueries {
if query.Expression == queryName && query.DataSource == v3.DataSourceMetrics {
if query.Expression == queryName && query.DataSource == v3.DataSourceLogs {
var parts []string
// We need to build uniqe cache query for BuilderQuery
parts = append(parts, fmt.Sprintf("source=%s", query.DataSource))
parts = append(parts, fmt.Sprintf("step=%d", query.StepInterval))
parts = append(parts, fmt.Sprintf("aggregate=%s", query.AggregateOperator))
parts = append(parts, fmt.Sprintf("limit=%d", query.Limit))
if query.AggregateAttribute.Key != "" {
parts = append(parts, fmt.Sprintf("aggregateAttribute=%s", query.AggregateAttribute.CacheKey()))
}
if query.Filters != nil && len(query.Filters.Items) > 0 {
for idx, filter := range query.Filters.Items {
parts = append(parts, fmt.Sprintf("filter-%d=%s", idx, filter.CacheKey()))
}
}
if len(query.GroupBy) > 0 {
for idx, groupBy := range query.GroupBy {
parts = append(parts, fmt.Sprintf("groupBy-%d=%s", idx, groupBy.CacheKey()))
}
}
if len(query.OrderBy) > 0 {
for idx, orderBy := range query.OrderBy {
parts = append(parts, fmt.Sprintf("orderBy-%d=%s", idx, orderBy.CacheKey()))
}
}
if len(query.Having) > 0 {
for idx, having := range query.Having {
parts = append(parts, fmt.Sprintf("having-%d=%s", idx, having.CacheKey()))
}
}
key := strings.Join(parts, "&")
keys[queryName] = key
} else if query.Expression == queryName && query.DataSource == v3.DataSourceMetrics {
var parts []string
// We need to build uniqe cache query for BuilderQuery
@ -361,7 +410,7 @@ func (c *cacheKeyGenerator) GenerateKeys(params *v3.QueryRangeParamsV3) map[stri
if query.Expression != query.QueryName {
expression, _ := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, EvalFuncs)
if !isMetricExpression(expression, params) {
if !isMetricExpression(expression, params) && !isLogExpression(expression, params) {
continue
}

View File

@ -817,6 +817,10 @@ type OrderBy struct {
IsColumn bool `json:"-"`
}
func (o OrderBy) CacheKey() string {
return fmt.Sprintf("%s-%s", o.ColumnName, o.Order)
}
// See HAVING_OPERATORS in queryBuilder.ts
type HavingOperator string