From 0e331dd177b76032f49e87dc708a2bd91b6f9a68 Mon Sep 17 00:00:00 2001 From: Nityananda Gohain Date: Mon, 12 Feb 2024 18:45:21 +0530 Subject: [PATCH] feat: support cache in logs (#4516) * feat: support cache in logs * fix: revert fluxinterval changes * feat: support for limit queries * feat: support for formula * fix: refractor code and don't return all points * fix: add nil params check * fix: error handling updated * fix: start and end params fix --- pkg/query-service/app/querier/helper.go | 213 +++++++++++++----- pkg/query-service/app/querier/querier.go | 15 ++ .../app/queryBuilder/query_builder.go | 53 ++++- pkg/query-service/model/v3/v3.go | 4 + 4 files changed, 227 insertions(+), 58 deletions(-) diff --git a/pkg/query-service/app/querier/helper.go b/pkg/query-service/app/querier/helper.go index 5bb3cc81f8..addd9744e3 100644 --- a/pkg/query-service/app/querier/helper.go +++ b/pkg/query-service/app/querier/helper.go @@ -17,6 +17,61 @@ import ( "go.uber.org/zap" ) +func prepareLogsQuery(ctx context.Context, + start, + end int64, + builderQuery *v3.BuilderQuery, + params *v3.QueryRangeParamsV3, + preferRPM bool, +) (string, error) { + query := "" + + if params == nil || builderQuery == nil { + return query, fmt.Errorf("params and builderQuery cannot be nil") + } + + // for ts query with limit replace it as it is already formed + if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 { + limitQuery, err := logsV3.PrepareLogsQuery( + start, + end, + params.CompositeQuery.QueryType, + params.CompositeQuery.PanelType, + builderQuery, + logsV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: preferRPM}, + ) + if err != nil { + return query, err + } + placeholderQuery, err := logsV3.PrepareLogsQuery( + start, + end, + params.CompositeQuery.QueryType, + params.CompositeQuery.PanelType, + builderQuery, + logsV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: preferRPM}, + ) + if err != nil { + return query, err + } + query = strings.Replace(placeholderQuery, "#LIMIT_PLACEHOLDER", limitQuery, 1) + return query, err + } + + query, err := logsV3.PrepareLogsQuery( + start, + end, + params.CompositeQuery.QueryType, + params.CompositeQuery.PanelType, + builderQuery, + logsV3.Options{PreferRPM: preferRPM}, + ) + if err != nil { + return query, err + } + return query, err +} + func (q *querier) runBuilderQuery( ctx context.Context, builderQuery *v3.BuilderQuery, @@ -35,59 +90,88 @@ func (q *querier) runBuilderQuery( preferRPM = q.featureLookUp.CheckFeature(constants.PreferRPM) == nil } - // TODO: handle other data sources if builderQuery.DataSource == v3.DataSourceLogs { var query string var err error - // for ts query with limit replace it as it is already formed - if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 { - limitQuery, err := logsV3.PrepareLogsQuery( - params.Start, - params.End, - params.CompositeQuery.QueryType, - params.CompositeQuery.PanelType, - builderQuery, - logsV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: preferRPM}, - ) - if err != nil { - ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil} - return - } - placeholderQuery, err := logsV3.PrepareLogsQuery( - params.Start, - params.End, - params.CompositeQuery.QueryType, - params.CompositeQuery.PanelType, - builderQuery, - logsV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: preferRPM}, - ) - if err != nil { - ch <- channelResult{Err: err, Name: queryName, Query: placeholderQuery, Series: nil} - return - } - query = strings.Replace(placeholderQuery, "#LIMIT_PLACEHOLDER", limitQuery, 1) - } else { - query, err = logsV3.PrepareLogsQuery( - params.Start, - params.End, - params.CompositeQuery.QueryType, - params.CompositeQuery.PanelType, - builderQuery, - logsV3.Options{PreferRPM: preferRPM}, - ) + if _, ok := cacheKeys[queryName]; !ok { + query, err = prepareLogsQuery(ctx, params.Start, params.End, builderQuery, params, preferRPM) if err != nil { ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} return } - } - - if err != nil { - ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} + series, err := q.execClickHouseQuery(ctx, query) + ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series} return } - series, err := q.execClickHouseQuery(ctx, query) - ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series} + + cacheKey := cacheKeys[queryName] + var cachedData []byte + if !params.NoCache && q.cache != nil { + var retrieveStatus status.RetrieveStatus + data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true) + zap.S().Infof("cache retrieve status: %s", retrieveStatus.String()) + if err == nil { + cachedData = data + } + } + misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData) + missedSeries := make([]*v3.Series, 0) + cachedSeries := make([]*v3.Series, 0) + for _, miss := range misses { + query, err = prepareLogsQuery(ctx, miss.start, miss.end, builderQuery, params, preferRPM) + if err != nil { + ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} + return + } + series, err := q.execClickHouseQuery(ctx, query) + if err != nil { + ch <- channelResult{ + Err: err, + Name: queryName, + Query: query, + Series: nil, + } + return + } + missedSeries = append(missedSeries, series...) + } + if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { + zap.S().Error("error unmarshalling cached data", zap.Error(err)) + } + mergedSeries := mergeSerieses(cachedSeries, missedSeries) + + var mergedSeriesData []byte + var marshallingErr error + missedSeriesLen := len(missedSeries) + if missedSeriesLen > 0 && !params.NoCache && q.cache != nil { + // caching the data + mergedSeriesData, marshallingErr = json.Marshal(mergedSeries) + if marshallingErr != nil { + zap.S().Error("error marshalling merged series", zap.Error(marshallingErr)) + } + } + + // response doesn't need everything + filterCachedPoints(mergedSeries, params.Start, params.End) + + ch <- channelResult{ + Err: nil, + Name: queryName, + Series: mergedSeries, + } + + // Cache the seriesList for future queries + if missedSeriesLen > 0 && !params.NoCache && q.cache != nil && marshallingErr == nil { + // caching the data + err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour) + if err != nil { + zap.S().Error("error storing merged series", zap.Error(err)) + return + } + } + return + } if builderQuery.DataSource == v3.DataSourceTraces { @@ -202,20 +286,28 @@ func (q *querier) runBuilderQuery( zap.S().Error("error unmarshalling cached data", zap.Error(err)) } mergedSeries := mergeSerieses(cachedSeries, missedSeries) + var mergedSeriesData []byte + var marshallingErr error + missedSeriesLen := len(missedSeries) + if missedSeriesLen > 0 && !params.NoCache && q.cache != nil { + // caching the data + mergedSeriesData, marshallingErr = json.Marshal(mergedSeries) + if marshallingErr != nil { + zap.S().Error("error marshalling merged series", zap.Error(marshallingErr)) + } + } + // response doesn't need everything + filterCachedPoints(mergedSeries, params.Start, params.End) ch <- channelResult{ Err: nil, Name: queryName, Series: mergedSeries, } + // Cache the seriesList for future queries - if len(missedSeries) > 0 && !params.NoCache && q.cache != nil { - mergedSeriesData, err := json.Marshal(mergedSeries) - if err != nil { - zap.S().Error("error marshalling merged series", zap.Error(err)) - return - } - err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour) + if missedSeriesLen > 0 && !params.NoCache && q.cache != nil && marshallingErr == nil { + err := q.cache.Store(cacheKey, mergedSeriesData, time.Hour) if err != nil { zap.S().Error("error storing merged series", zap.Error(err)) return @@ -284,18 +376,27 @@ func (q *querier) runBuilderExpression( } mergedSeries := mergeSerieses(cachedSeries, missedSeries) + var mergedSeriesData []byte + missedSeriesLen := len(missedSeries) + var marshallingErr error + if missedSeriesLen > 0 && !params.NoCache && q.cache != nil { + // caching the data + mergedSeriesData, marshallingErr = json.Marshal(mergedSeries) + if marshallingErr != nil { + zap.S().Error("error marshalling merged series", zap.Error(marshallingErr)) + } + } + + // response doesn't need everything + filterCachedPoints(mergedSeries, params.Start, params.End) ch <- channelResult{ Err: nil, Name: queryName, Series: mergedSeries, } + // Cache the seriesList for future queries - if len(missedSeries) > 0 && !params.NoCache && q.cache != nil { - mergedSeriesData, err := json.Marshal(mergedSeries) - if err != nil { - zap.S().Error("error marshalling merged series", zap.Error(err)) - return - } + if len(missedSeries) > 0 && !params.NoCache && q.cache != nil && marshallingErr == nil { err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour) if err != nil { zap.S().Error("error storing merged series", zap.Error(err)) diff --git a/pkg/query-service/app/querier/querier.go b/pkg/query-service/app/querier/querier.go index 16ef778d20..103660f8bc 100644 --- a/pkg/query-service/app/querier/querier.go +++ b/pkg/query-service/app/querier/querier.go @@ -160,6 +160,8 @@ func findMissingTimeRanges(start, end, step int64, seriesList []*v3.Series, flux } } + // time.Now is used because here we are considering the case where data might not + // be fully ingested for last (fluxInterval) minutes endMillis := time.Now().UnixMilli() adjustStep := int64(math.Min(float64(step), 60)) roundedMillis := endMillis - (endMillis % (adjustStep * 1000)) @@ -241,6 +243,19 @@ func labelsToString(labels map[string]string) string { return fmt.Sprintf("{%s}", strings.Join(labelKVs, ",")) } +func filterCachedPoints(cachedSeries []*v3.Series, start, end int64) { + for _, c := range cachedSeries { + points := []v3.Point{} + for _, p := range c.Points { + if p.Timestamp < start || p.Timestamp > end { + continue + } + points = append(points, p) + } + c.Points = points + } +} + func mergeSerieses(cachedSeries, missedSeries []*v3.Series) []*v3.Series { // Merge the missed series with the cached series by timestamp mergedSeries := make([]*v3.Series, 0) diff --git a/pkg/query-service/app/queryBuilder/query_builder.go b/pkg/query-service/app/queryBuilder/query_builder.go index 5b4ad4291f..988acfb458 100644 --- a/pkg/query-service/app/queryBuilder/query_builder.go +++ b/pkg/query-service/app/queryBuilder/query_builder.go @@ -302,6 +302,16 @@ func isMetricExpression(expression *govaluate.EvaluableExpression, params *v3.Qu return true } +func isLogExpression(expression *govaluate.EvaluableExpression, params *v3.QueryRangeParamsV3) bool { + variables := unique(expression.Vars()) + for _, variable := range variables { + if params.CompositeQuery.BuilderQueries[variable].DataSource != v3.DataSourceLogs { + return false + } + } + return true +} + func (c *cacheKeyGenerator) GenerateKeys(params *v3.QueryRangeParamsV3) map[string]string { keys := make(map[string]string) @@ -320,7 +330,46 @@ func (c *cacheKeyGenerator) GenerateKeys(params *v3.QueryRangeParamsV3) map[stri // Build keys for each builder query for queryName, query := range params.CompositeQuery.BuilderQueries { - if query.Expression == queryName && query.DataSource == v3.DataSourceMetrics { + if query.Expression == queryName && query.DataSource == v3.DataSourceLogs { + var parts []string + + // We need to build uniqe cache query for BuilderQuery + parts = append(parts, fmt.Sprintf("source=%s", query.DataSource)) + parts = append(parts, fmt.Sprintf("step=%d", query.StepInterval)) + parts = append(parts, fmt.Sprintf("aggregate=%s", query.AggregateOperator)) + parts = append(parts, fmt.Sprintf("limit=%d", query.Limit)) + + if query.AggregateAttribute.Key != "" { + parts = append(parts, fmt.Sprintf("aggregateAttribute=%s", query.AggregateAttribute.CacheKey())) + } + + if query.Filters != nil && len(query.Filters.Items) > 0 { + for idx, filter := range query.Filters.Items { + parts = append(parts, fmt.Sprintf("filter-%d=%s", idx, filter.CacheKey())) + } + } + + if len(query.GroupBy) > 0 { + for idx, groupBy := range query.GroupBy { + parts = append(parts, fmt.Sprintf("groupBy-%d=%s", idx, groupBy.CacheKey())) + } + } + + if len(query.OrderBy) > 0 { + for idx, orderBy := range query.OrderBy { + parts = append(parts, fmt.Sprintf("orderBy-%d=%s", idx, orderBy.CacheKey())) + } + } + + if len(query.Having) > 0 { + for idx, having := range query.Having { + parts = append(parts, fmt.Sprintf("having-%d=%s", idx, having.CacheKey())) + } + } + + key := strings.Join(parts, "&") + keys[queryName] = key + } else if query.Expression == queryName && query.DataSource == v3.DataSourceMetrics { var parts []string // We need to build uniqe cache query for BuilderQuery @@ -361,7 +410,7 @@ func (c *cacheKeyGenerator) GenerateKeys(params *v3.QueryRangeParamsV3) map[stri if query.Expression != query.QueryName { expression, _ := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, EvalFuncs) - if !isMetricExpression(expression, params) { + if !isMetricExpression(expression, params) && !isLogExpression(expression, params) { continue } diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index 0fe9dcdde2..010b0b41c1 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -817,6 +817,10 @@ type OrderBy struct { IsColumn bool `json:"-"` } +func (o OrderBy) CacheKey() string { + return fmt.Sprintf("%s-%s", o.ColumnName, o.Order) +} + // See HAVING_OPERATORS in queryBuilder.ts type HavingOperator string