mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-10 04:49:02 +08:00
fix: querier v2 synced and tablePanel result processor updated (#4807)
This commit is contained in:
parent
87534b6fb6
commit
a7b0ef55ad
@ -3722,6 +3722,7 @@ func (aH *APIHandler) queryRangeV4(ctx context.Context, queryRangeParams *v3.Que
|
||||
}
|
||||
|
||||
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder {
|
||||
|
||||
result, err = postProcessResult(result, queryRangeParams)
|
||||
}
|
||||
|
||||
@ -3786,6 +3787,12 @@ func postProcessResult(result []*v3.Result, queryRangeParams *v3.QueryRangeParam
|
||||
// We apply the functions here it's easier to add new functions
|
||||
applyFunctions(result, queryRangeParams)
|
||||
|
||||
// expressions are executed at query serivce so the value of time.now in the invdividual
|
||||
// queries will be different so for table panel we are making it same.
|
||||
if queryRangeParams.CompositeQuery.PanelType == v3.PanelTypeTable {
|
||||
tablePanelResultProcessor(result)
|
||||
}
|
||||
|
||||
for _, query := range queryRangeParams.CompositeQuery.BuilderQueries {
|
||||
// The way we distinguish between a formula and a query is by checking if the expression
|
||||
// is the same as the query name
|
||||
@ -3838,3 +3845,18 @@ func applyFunctions(results []*v3.Result, queryRangeParams *v3.QueryRangeParamsV
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func tablePanelResultProcessor(results []*v3.Result) {
|
||||
var ts int64
|
||||
for ridx := range results {
|
||||
for sidx := range results[ridx].Series {
|
||||
for pidx := range results[ridx].Series[sidx].Points {
|
||||
if ts == 0 {
|
||||
ts = results[ridx].Series[sidx].Points[pidx].Timestamp
|
||||
} else {
|
||||
results[ridx].Series[sidx].Points[pidx].Timestamp = ts
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -18,6 +18,61 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func prepareLogsQuery(ctx context.Context,
|
||||
start,
|
||||
end int64,
|
||||
builderQuery *v3.BuilderQuery,
|
||||
params *v3.QueryRangeParamsV3,
|
||||
preferRPM bool,
|
||||
) (string, error) {
|
||||
query := ""
|
||||
|
||||
if params == nil || builderQuery == nil {
|
||||
return query, fmt.Errorf("params and builderQuery cannot be nil")
|
||||
}
|
||||
|
||||
// for ts query with limit replace it as it is already formed
|
||||
if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 {
|
||||
limitQuery, err := logsV3.PrepareLogsQuery(
|
||||
start,
|
||||
end,
|
||||
params.CompositeQuery.QueryType,
|
||||
params.CompositeQuery.PanelType,
|
||||
builderQuery,
|
||||
logsV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: preferRPM},
|
||||
)
|
||||
if err != nil {
|
||||
return query, err
|
||||
}
|
||||
placeholderQuery, err := logsV3.PrepareLogsQuery(
|
||||
start,
|
||||
end,
|
||||
params.CompositeQuery.QueryType,
|
||||
params.CompositeQuery.PanelType,
|
||||
builderQuery,
|
||||
logsV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: preferRPM},
|
||||
)
|
||||
if err != nil {
|
||||
return query, err
|
||||
}
|
||||
query = strings.Replace(placeholderQuery, "#LIMIT_PLACEHOLDER", limitQuery, 1)
|
||||
return query, err
|
||||
}
|
||||
|
||||
query, err := logsV3.PrepareLogsQuery(
|
||||
start,
|
||||
end,
|
||||
params.CompositeQuery.QueryType,
|
||||
params.CompositeQuery.PanelType,
|
||||
builderQuery,
|
||||
logsV3.Options{PreferRPM: preferRPM},
|
||||
)
|
||||
if err != nil {
|
||||
return query, err
|
||||
}
|
||||
return query, err
|
||||
}
|
||||
|
||||
func (q *querier) runBuilderQuery(
|
||||
ctx context.Context,
|
||||
builderQuery *v3.BuilderQuery,
|
||||
@ -48,54 +103,82 @@ func (q *querier) runBuilderQuery(
|
||||
if builderQuery.DataSource == v3.DataSourceLogs {
|
||||
var query string
|
||||
var err error
|
||||
// for ts query with limit replace it as it is already formed
|
||||
if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 {
|
||||
limitQuery, err := logsV3.PrepareLogsQuery(
|
||||
start,
|
||||
end,
|
||||
params.CompositeQuery.QueryType,
|
||||
params.CompositeQuery.PanelType,
|
||||
builderQuery,
|
||||
logsV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: preferRPM},
|
||||
)
|
||||
if err != nil {
|
||||
ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil}
|
||||
return
|
||||
}
|
||||
placeholderQuery, err := logsV3.PrepareLogsQuery(
|
||||
start,
|
||||
end,
|
||||
params.CompositeQuery.QueryType,
|
||||
params.CompositeQuery.PanelType,
|
||||
builderQuery,
|
||||
logsV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: preferRPM},
|
||||
)
|
||||
if err != nil {
|
||||
ch <- channelResult{Err: err, Name: queryName, Query: placeholderQuery, Series: nil}
|
||||
return
|
||||
}
|
||||
query = strings.Replace(placeholderQuery, "#LIMIT_PLACEHOLDER", limitQuery, 1)
|
||||
} else {
|
||||
query, err = logsV3.PrepareLogsQuery(
|
||||
start,
|
||||
end,
|
||||
params.CompositeQuery.QueryType,
|
||||
params.CompositeQuery.PanelType,
|
||||
builderQuery,
|
||||
logsV3.Options{PreferRPM: preferRPM},
|
||||
)
|
||||
if _, ok := cacheKeys[queryName]; !ok {
|
||||
query, err = prepareLogsQuery(ctx, start, end, builderQuery, params, preferRPM)
|
||||
if err != nil {
|
||||
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
|
||||
series, err := q.execClickHouseQuery(ctx, query)
|
||||
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series}
|
||||
return
|
||||
}
|
||||
series, err := q.execClickHouseQuery(ctx, query)
|
||||
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series}
|
||||
cacheKey := cacheKeys[queryName]
|
||||
var cachedData []byte
|
||||
if !params.NoCache && q.cache != nil {
|
||||
var retrieveStatus status.RetrieveStatus
|
||||
data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true)
|
||||
zap.L().Info("cache retrieve status", zap.String("status", retrieveStatus.String()))
|
||||
if err == nil {
|
||||
cachedData = data
|
||||
}
|
||||
}
|
||||
misses := q.findMissingTimeRanges(start, end, params.Step, cachedData)
|
||||
missedSeries := make([]*v3.Series, 0)
|
||||
cachedSeries := make([]*v3.Series, 0)
|
||||
for _, miss := range misses {
|
||||
query, err = prepareLogsQuery(ctx, miss.start, miss.end, builderQuery, params, preferRPM)
|
||||
if err != nil {
|
||||
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
|
||||
return
|
||||
}
|
||||
series, err := q.execClickHouseQuery(ctx, query)
|
||||
if err != nil {
|
||||
ch <- channelResult{
|
||||
Err: err,
|
||||
Name: queryName,
|
||||
Query: query,
|
||||
Series: nil,
|
||||
}
|
||||
return
|
||||
}
|
||||
missedSeries = append(missedSeries, series...)
|
||||
}
|
||||
if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil {
|
||||
zap.L().Error("error unmarshalling cached data", zap.Error(err))
|
||||
}
|
||||
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
|
||||
|
||||
var mergedSeriesData []byte
|
||||
var marshallingErr error
|
||||
missedSeriesLen := len(missedSeries)
|
||||
if missedSeriesLen > 0 && !params.NoCache && q.cache != nil {
|
||||
// caching the data
|
||||
mergedSeriesData, marshallingErr = json.Marshal(mergedSeries)
|
||||
if marshallingErr != nil {
|
||||
zap.L().Error("error marshalling merged series", zap.Error(marshallingErr))
|
||||
}
|
||||
}
|
||||
|
||||
// response doesn't need everything
|
||||
filterCachedPoints(mergedSeries, start, end)
|
||||
|
||||
ch <- channelResult{
|
||||
Err: nil,
|
||||
Name: queryName,
|
||||
Series: mergedSeries,
|
||||
}
|
||||
|
||||
// Cache the seriesList for future queries
|
||||
if missedSeriesLen > 0 && !params.NoCache && q.cache != nil && marshallingErr == nil {
|
||||
// caching the data
|
||||
err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
|
||||
if err != nil {
|
||||
zap.L().Error("error storing merged series", zap.Error(err))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@ -211,6 +294,19 @@ func (q *querier) runBuilderQuery(
|
||||
zap.L().Error("error unmarshalling cached data", zap.Error(err))
|
||||
}
|
||||
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
|
||||
var mergedSeriesData []byte
|
||||
var marshallingErr error
|
||||
missedSeriesLen := len(missedSeries)
|
||||
if missedSeriesLen > 0 && !params.NoCache && q.cache != nil {
|
||||
// caching the data
|
||||
mergedSeriesData, marshallingErr = json.Marshal(mergedSeries)
|
||||
if marshallingErr != nil {
|
||||
zap.S().Error("error marshalling merged series", zap.Error(marshallingErr))
|
||||
}
|
||||
}
|
||||
|
||||
// response doesn't need everything
|
||||
filterCachedPoints(mergedSeries, params.Start, params.End)
|
||||
|
||||
ch <- channelResult{
|
||||
Err: nil,
|
||||
@ -218,13 +314,8 @@ func (q *querier) runBuilderQuery(
|
||||
Series: mergedSeries,
|
||||
}
|
||||
// Cache the seriesList for future queries
|
||||
if len(missedSeries) > 0 && !params.NoCache && q.cache != nil {
|
||||
mergedSeriesData, err := json.Marshal(mergedSeries)
|
||||
if err != nil {
|
||||
zap.L().Error("error marshalling merged series", zap.Error(err))
|
||||
return
|
||||
}
|
||||
err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
|
||||
if missedSeriesLen > 0 && !params.NoCache && q.cache != nil && marshallingErr == nil {
|
||||
err := q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
|
||||
if err != nil {
|
||||
zap.L().Error("error storing merged series", zap.Error(err))
|
||||
return
|
||||
@ -293,18 +384,27 @@ func (q *querier) runBuilderExpression(
|
||||
}
|
||||
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
|
||||
|
||||
var mergedSeriesData []byte
|
||||
missedSeriesLen := len(missedSeries)
|
||||
var marshallingErr error
|
||||
if missedSeriesLen > 0 && !params.NoCache && q.cache != nil {
|
||||
// caching the data
|
||||
mergedSeriesData, marshallingErr = json.Marshal(mergedSeries)
|
||||
if marshallingErr != nil {
|
||||
zap.S().Error("error marshalling merged series", zap.Error(marshallingErr))
|
||||
}
|
||||
}
|
||||
|
||||
// response doesn't need everything
|
||||
filterCachedPoints(mergedSeries, params.Start, params.End)
|
||||
|
||||
ch <- channelResult{
|
||||
Err: nil,
|
||||
Name: queryName,
|
||||
Series: mergedSeries,
|
||||
}
|
||||
// Cache the seriesList for future queries
|
||||
if len(missedSeries) > 0 && !params.NoCache && q.cache != nil {
|
||||
mergedSeriesData, err := json.Marshal(mergedSeries)
|
||||
if err != nil {
|
||||
zap.L().Error("error marshalling merged series", zap.Error(err))
|
||||
return
|
||||
}
|
||||
if len(missedSeries) > 0 && !params.NoCache && q.cache != nil && marshallingErr == nil {
|
||||
err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
|
||||
if err != nil {
|
||||
zap.L().Error("error storing merged series", zap.Error(err))
|
||||
|
@ -241,6 +241,19 @@ func labelsToString(labels map[string]string) string {
|
||||
return fmt.Sprintf("{%s}", strings.Join(labelKVs, ","))
|
||||
}
|
||||
|
||||
func filterCachedPoints(cachedSeries []*v3.Series, start, end int64) {
|
||||
for _, c := range cachedSeries {
|
||||
points := []v3.Point{}
|
||||
for _, p := range c.Points {
|
||||
if p.Timestamp < start || p.Timestamp > end {
|
||||
continue
|
||||
}
|
||||
points = append(points, p)
|
||||
}
|
||||
c.Points = points
|
||||
}
|
||||
}
|
||||
|
||||
func mergeSerieses(cachedSeries, missedSeries []*v3.Series) []*v3.Series {
|
||||
// Merge the missed series with the cached series by timestamp
|
||||
mergedSeries := make([]*v3.Series, 0)
|
||||
|
Loading…
x
Reference in New Issue
Block a user