diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 7f9e6795a7..9244700af0 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -3722,6 +3722,7 @@ func (aH *APIHandler) queryRangeV4(ctx context.Context, queryRangeParams *v3.Que } if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder { + result, err = postProcessResult(result, queryRangeParams) } @@ -3786,6 +3787,12 @@ func postProcessResult(result []*v3.Result, queryRangeParams *v3.QueryRangeParam // We apply the functions here it's easier to add new functions applyFunctions(result, queryRangeParams) + // expressions are executed at query serivce so the value of time.now in the invdividual + // queries will be different so for table panel we are making it same. + if queryRangeParams.CompositeQuery.PanelType == v3.PanelTypeTable { + tablePanelResultProcessor(result) + } + for _, query := range queryRangeParams.CompositeQuery.BuilderQueries { // The way we distinguish between a formula and a query is by checking if the expression // is the same as the query name @@ -3838,3 +3845,18 @@ func applyFunctions(results []*v3.Result, queryRangeParams *v3.QueryRangeParamsV } } } + +func tablePanelResultProcessor(results []*v3.Result) { + var ts int64 + for ridx := range results { + for sidx := range results[ridx].Series { + for pidx := range results[ridx].Series[sidx].Points { + if ts == 0 { + ts = results[ridx].Series[sidx].Points[pidx].Timestamp + } else { + results[ridx].Series[sidx].Points[pidx].Timestamp = ts + } + } + } + } +} diff --git a/pkg/query-service/app/querier/v2/helper.go b/pkg/query-service/app/querier/v2/helper.go index e564956f19..cc4e83b702 100644 --- a/pkg/query-service/app/querier/v2/helper.go +++ b/pkg/query-service/app/querier/v2/helper.go @@ -18,6 +18,61 @@ import ( "go.uber.org/zap" ) +func prepareLogsQuery(ctx context.Context, + start, + end int64, + builderQuery *v3.BuilderQuery, + params *v3.QueryRangeParamsV3, + preferRPM bool, +) (string, error) { + query := "" + + if params == nil || builderQuery == nil { + return query, fmt.Errorf("params and builderQuery cannot be nil") + } + + // for ts query with limit replace it as it is already formed + if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 { + limitQuery, err := logsV3.PrepareLogsQuery( + start, + end, + params.CompositeQuery.QueryType, + params.CompositeQuery.PanelType, + builderQuery, + logsV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: preferRPM}, + ) + if err != nil { + return query, err + } + placeholderQuery, err := logsV3.PrepareLogsQuery( + start, + end, + params.CompositeQuery.QueryType, + params.CompositeQuery.PanelType, + builderQuery, + logsV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: preferRPM}, + ) + if err != nil { + return query, err + } + query = strings.Replace(placeholderQuery, "#LIMIT_PLACEHOLDER", limitQuery, 1) + return query, err + } + + query, err := logsV3.PrepareLogsQuery( + start, + end, + params.CompositeQuery.QueryType, + params.CompositeQuery.PanelType, + builderQuery, + logsV3.Options{PreferRPM: preferRPM}, + ) + if err != nil { + return query, err + } + return query, err +} + func (q *querier) runBuilderQuery( ctx context.Context, builderQuery *v3.BuilderQuery, @@ -48,54 +103,82 @@ func (q *querier) runBuilderQuery( if builderQuery.DataSource == v3.DataSourceLogs { var query string var err error - // for ts query with limit replace it as it is already formed - if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 { - limitQuery, err := logsV3.PrepareLogsQuery( - start, - end, - params.CompositeQuery.QueryType, - params.CompositeQuery.PanelType, - builderQuery, - logsV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: preferRPM}, - ) - if err != nil { - ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil} - return - } - placeholderQuery, err := logsV3.PrepareLogsQuery( - start, - end, - params.CompositeQuery.QueryType, - params.CompositeQuery.PanelType, - builderQuery, - logsV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: preferRPM}, - ) - if err != nil { - ch <- channelResult{Err: err, Name: queryName, Query: placeholderQuery, Series: nil} - return - } - query = strings.Replace(placeholderQuery, "#LIMIT_PLACEHOLDER", limitQuery, 1) - } else { - query, err = logsV3.PrepareLogsQuery( - start, - end, - params.CompositeQuery.QueryType, - params.CompositeQuery.PanelType, - builderQuery, - logsV3.Options{PreferRPM: preferRPM}, - ) + if _, ok := cacheKeys[queryName]; !ok { + query, err = prepareLogsQuery(ctx, start, end, builderQuery, params, preferRPM) if err != nil { ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} return } - } - - if err != nil { - ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} + series, err := q.execClickHouseQuery(ctx, query) + ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series} return } - series, err := q.execClickHouseQuery(ctx, query) - ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series} + cacheKey := cacheKeys[queryName] + var cachedData []byte + if !params.NoCache && q.cache != nil { + var retrieveStatus status.RetrieveStatus + data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true) + zap.L().Info("cache retrieve status", zap.String("status", retrieveStatus.String())) + if err == nil { + cachedData = data + } + } + misses := q.findMissingTimeRanges(start, end, params.Step, cachedData) + missedSeries := make([]*v3.Series, 0) + cachedSeries := make([]*v3.Series, 0) + for _, miss := range misses { + query, err = prepareLogsQuery(ctx, miss.start, miss.end, builderQuery, params, preferRPM) + if err != nil { + ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} + return + } + series, err := q.execClickHouseQuery(ctx, query) + if err != nil { + ch <- channelResult{ + Err: err, + Name: queryName, + Query: query, + Series: nil, + } + return + } + missedSeries = append(missedSeries, series...) + } + if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { + zap.L().Error("error unmarshalling cached data", zap.Error(err)) + } + mergedSeries := mergeSerieses(cachedSeries, missedSeries) + + var mergedSeriesData []byte + var marshallingErr error + missedSeriesLen := len(missedSeries) + if missedSeriesLen > 0 && !params.NoCache && q.cache != nil { + // caching the data + mergedSeriesData, marshallingErr = json.Marshal(mergedSeries) + if marshallingErr != nil { + zap.L().Error("error marshalling merged series", zap.Error(marshallingErr)) + } + } + + // response doesn't need everything + filterCachedPoints(mergedSeries, start, end) + + ch <- channelResult{ + Err: nil, + Name: queryName, + Series: mergedSeries, + } + + // Cache the seriesList for future queries + if missedSeriesLen > 0 && !params.NoCache && q.cache != nil && marshallingErr == nil { + // caching the data + err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour) + if err != nil { + zap.L().Error("error storing merged series", zap.Error(err)) + return + } + } + return } @@ -211,6 +294,19 @@ func (q *querier) runBuilderQuery( zap.L().Error("error unmarshalling cached data", zap.Error(err)) } mergedSeries := mergeSerieses(cachedSeries, missedSeries) + var mergedSeriesData []byte + var marshallingErr error + missedSeriesLen := len(missedSeries) + if missedSeriesLen > 0 && !params.NoCache && q.cache != nil { + // caching the data + mergedSeriesData, marshallingErr = json.Marshal(mergedSeries) + if marshallingErr != nil { + zap.S().Error("error marshalling merged series", zap.Error(marshallingErr)) + } + } + + // response doesn't need everything + filterCachedPoints(mergedSeries, params.Start, params.End) ch <- channelResult{ Err: nil, @@ -218,13 +314,8 @@ func (q *querier) runBuilderQuery( Series: mergedSeries, } // Cache the seriesList for future queries - if len(missedSeries) > 0 && !params.NoCache && q.cache != nil { - mergedSeriesData, err := json.Marshal(mergedSeries) - if err != nil { - zap.L().Error("error marshalling merged series", zap.Error(err)) - return - } - err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour) + if missedSeriesLen > 0 && !params.NoCache && q.cache != nil && marshallingErr == nil { + err := q.cache.Store(cacheKey, mergedSeriesData, time.Hour) if err != nil { zap.L().Error("error storing merged series", zap.Error(err)) return @@ -293,18 +384,27 @@ func (q *querier) runBuilderExpression( } mergedSeries := mergeSerieses(cachedSeries, missedSeries) + var mergedSeriesData []byte + missedSeriesLen := len(missedSeries) + var marshallingErr error + if missedSeriesLen > 0 && !params.NoCache && q.cache != nil { + // caching the data + mergedSeriesData, marshallingErr = json.Marshal(mergedSeries) + if marshallingErr != nil { + zap.S().Error("error marshalling merged series", zap.Error(marshallingErr)) + } + } + + // response doesn't need everything + filterCachedPoints(mergedSeries, params.Start, params.End) + ch <- channelResult{ Err: nil, Name: queryName, Series: mergedSeries, } // Cache the seriesList for future queries - if len(missedSeries) > 0 && !params.NoCache && q.cache != nil { - mergedSeriesData, err := json.Marshal(mergedSeries) - if err != nil { - zap.L().Error("error marshalling merged series", zap.Error(err)) - return - } + if len(missedSeries) > 0 && !params.NoCache && q.cache != nil && marshallingErr == nil { err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour) if err != nil { zap.L().Error("error storing merged series", zap.Error(err)) diff --git a/pkg/query-service/app/querier/v2/querier.go b/pkg/query-service/app/querier/v2/querier.go index e45153da7d..47933e847d 100644 --- a/pkg/query-service/app/querier/v2/querier.go +++ b/pkg/query-service/app/querier/v2/querier.go @@ -241,6 +241,19 @@ func labelsToString(labels map[string]string) string { return fmt.Sprintf("{%s}", strings.Join(labelKVs, ",")) } +func filterCachedPoints(cachedSeries []*v3.Series, start, end int64) { + for _, c := range cachedSeries { + points := []v3.Point{} + for _, p := range c.Points { + if p.Timestamp < start || p.Timestamp > end { + continue + } + points = append(points, p) + } + c.Points = points + } +} + func mergeSerieses(cachedSeries, missedSeries []*v3.Series) []*v3.Series { // Merge the missed series with the cached series by timestamp mergedSeries := make([]*v3.Series, 0)