mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-11 17:19:06 +08:00
fix: incorrect response for promql value type panels (#5497)
This commit is contained in:
parent
3ecb2e35ef
commit
9194ab08b6
@ -18,7 +18,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func prepareLogsQuery(ctx context.Context,
|
||||
func prepareLogsQuery(_ context.Context,
|
||||
start,
|
||||
end int64,
|
||||
builderQuery *v3.BuilderQuery,
|
||||
|
@ -50,8 +50,10 @@ type querier struct {
|
||||
// TODO(srikanthccv): remove this once we have a proper mock
|
||||
testingMode bool
|
||||
queriesExecuted []string
|
||||
returnedSeries []*v3.Series
|
||||
returnedErr error
|
||||
// tuple of start and end time in milliseconds
|
||||
timeRanges [][]int
|
||||
returnedSeries []*v3.Series
|
||||
returnedErr error
|
||||
}
|
||||
|
||||
type QuerierOptions struct {
|
||||
@ -117,6 +119,7 @@ func (q *querier) execClickHouseQuery(ctx context.Context, query string) ([]*v3.
|
||||
func (q *querier) execPromQuery(ctx context.Context, params *model.QueryRangeParams) ([]*v3.Series, error) {
|
||||
q.queriesExecuted = append(q.queriesExecuted, params.Query)
|
||||
if q.testingMode && q.reader == nil {
|
||||
q.timeRanges = append(q.timeRanges, []int{int(params.Start.UnixMilli()), int(params.End.UnixMilli())})
|
||||
return q.returnedSeries, q.returnedErr
|
||||
}
|
||||
promResult, _, err := q.reader.GetQueryRangeResult(ctx, params)
|
||||
@ -342,10 +345,10 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam
|
||||
wg.Add(1)
|
||||
go func(queryName string, promQuery *v3.PromQuery) {
|
||||
defer wg.Done()
|
||||
cacheKey := cacheKeys[queryName]
|
||||
cacheKey, ok := cacheKeys[queryName]
|
||||
var cachedData []byte
|
||||
// Ensure NoCache is not set and cache is not nil
|
||||
if !params.NoCache && q.cache != nil {
|
||||
if !params.NoCache && q.cache != nil && ok {
|
||||
data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true)
|
||||
zap.L().Info("cache retrieve status", zap.String("status", retrieveStatus.String()))
|
||||
if err == nil {
|
||||
@ -373,7 +376,7 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam
|
||||
channelResults <- channelResult{Err: nil, Name: queryName, Query: promQuery.Query, Series: mergedSeries}
|
||||
|
||||
// Cache the seriesList for future queries
|
||||
if len(missedSeries) > 0 && !params.NoCache && q.cache != nil {
|
||||
if len(missedSeries) > 0 && !params.NoCache && q.cache != nil && ok {
|
||||
mergedSeriesData, err := json.Marshal(mergedSeries)
|
||||
if err != nil {
|
||||
zap.L().Error("error marshalling merged series", zap.Error(err))
|
||||
@ -546,3 +549,7 @@ func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3,
|
||||
func (q *querier) QueriesExecuted() []string {
|
||||
return q.queriesExecuted
|
||||
}
|
||||
|
||||
func (q *querier) TimeRanges() [][]int {
|
||||
return q.timeRanges
|
||||
}
|
||||
|
@ -951,3 +951,102 @@ func TestQueryRangeTimeShiftWithLimitAndCache(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestQueryRangeValueTypePromQL(t *testing.T) {
|
||||
// There shouldn't be any caching for value panel type
|
||||
params := []*v3.QueryRangeParamsV3{
|
||||
{
|
||||
Start: 1675115596722,
|
||||
End: 1675115596722 + 120*60*1000,
|
||||
Step: 5 * time.Minute.Milliseconds(),
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypePromQL,
|
||||
PanelType: v3.PanelTypeValue,
|
||||
PromQueries: map[string]*v3.PromQuery{
|
||||
"A": {
|
||||
Query: "signoz_calls_total",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Start: 1675115596722 + 60*60*1000,
|
||||
End: 1675115596722 + 180*60*1000,
|
||||
Step: 5 * time.Minute.Milliseconds(),
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
QueryType: v3.QueryTypePromQL,
|
||||
PanelType: v3.PanelTypeValue,
|
||||
PromQueries: map[string]*v3.PromQuery{
|
||||
"A": {
|
||||
Query: "signoz_latency_bucket",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
cache := inmemory.New(&inmemory.Options{TTL: 60 * time.Minute, CleanupInterval: 10 * time.Minute})
|
||||
opts := QuerierOptions{
|
||||
Cache: cache,
|
||||
Reader: nil,
|
||||
FluxInterval: 5 * time.Minute,
|
||||
KeyGenerator: queryBuilder.NewKeyGenerator(),
|
||||
|
||||
TestingMode: true,
|
||||
ReturnedSeries: []*v3.Series{
|
||||
{
|
||||
Labels: map[string]string{
|
||||
"method": "GET",
|
||||
"service_name": "test",
|
||||
"__name__": "doesn't matter",
|
||||
},
|
||||
Points: []v3.Point{
|
||||
{Timestamp: 1675115596722, Value: 1},
|
||||
{Timestamp: 1675115596722 + 60*60*1000, Value: 2},
|
||||
{Timestamp: 1675115596722 + 120*60*1000, Value: 3},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
q := NewQuerier(opts)
|
||||
|
||||
expectedQueryAndTimeRanges := []struct {
|
||||
query string
|
||||
ranges []missInterval
|
||||
}{
|
||||
{
|
||||
query: "signoz_calls_total",
|
||||
ranges: []missInterval{
|
||||
{start: 1675115596722, end: 1675115596722 + 120*60*1000},
|
||||
},
|
||||
},
|
||||
{
|
||||
query: "signoz_latency_bucket",
|
||||
ranges: []missInterval{
|
||||
{start: 1675115596722 + 60*60*1000, end: 1675115596722 + 180*60*1000},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for i, param := range params {
|
||||
_, errByName, err := q.QueryRange(context.Background(), param, nil)
|
||||
if err != nil {
|
||||
t.Errorf("expected no error, got %s", err)
|
||||
}
|
||||
if len(errByName) > 0 {
|
||||
t.Errorf("expected no error, got %v", errByName)
|
||||
}
|
||||
|
||||
if !strings.Contains(q.QueriesExecuted()[i], expectedQueryAndTimeRanges[i].query) {
|
||||
t.Errorf("expected query to contain %s, got %s", expectedQueryAndTimeRanges[i].query, q.QueriesExecuted()[i])
|
||||
}
|
||||
if len(q.TimeRanges()[i]) != 2 {
|
||||
t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i])
|
||||
}
|
||||
if q.TimeRanges()[i][0] != int(expectedQueryAndTimeRanges[i].ranges[0].start) {
|
||||
t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i])
|
||||
}
|
||||
if q.TimeRanges()[i][1] != int(expectedQueryAndTimeRanges[i].ranges[0].end) {
|
||||
t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -18,7 +18,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func prepareLogsQuery(ctx context.Context,
|
||||
func prepareLogsQuery(_ context.Context,
|
||||
start,
|
||||
end int64,
|
||||
builderQuery *v3.BuilderQuery,
|
||||
|
@ -50,8 +50,10 @@ type querier struct {
|
||||
// TODO(srikanthccv): remove this once we have a proper mock
|
||||
testingMode bool
|
||||
queriesExecuted []string
|
||||
returnedSeries []*v3.Series
|
||||
returnedErr error
|
||||
// tuple of start and end time in milliseconds
|
||||
timeRanges [][]int
|
||||
returnedSeries []*v3.Series
|
||||
returnedErr error
|
||||
}
|
||||
|
||||
type QuerierOptions struct {
|
||||
@ -117,6 +119,7 @@ func (q *querier) execClickHouseQuery(ctx context.Context, query string) ([]*v3.
|
||||
func (q *querier) execPromQuery(ctx context.Context, params *model.QueryRangeParams) ([]*v3.Series, error) {
|
||||
q.queriesExecuted = append(q.queriesExecuted, params.Query)
|
||||
if q.testingMode && q.reader == nil {
|
||||
q.timeRanges = append(q.timeRanges, []int{int(params.Start.UnixMilli()), int(params.End.UnixMilli())})
|
||||
return q.returnedSeries, q.returnedErr
|
||||
}
|
||||
promResult, _, err := q.reader.GetQueryRangeResult(ctx, params)
|
||||
@ -335,10 +338,10 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam
|
||||
wg.Add(1)
|
||||
go func(queryName string, promQuery *v3.PromQuery) {
|
||||
defer wg.Done()
|
||||
cacheKey := cacheKeys[queryName]
|
||||
cacheKey, ok := cacheKeys[queryName]
|
||||
var cachedData []byte
|
||||
// Ensure NoCache is not set and cache is not nil
|
||||
if !params.NoCache && q.cache != nil {
|
||||
if !params.NoCache && q.cache != nil && ok {
|
||||
data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true)
|
||||
zap.L().Info("cache retrieve status", zap.String("status", retrieveStatus.String()))
|
||||
if err == nil {
|
||||
@ -366,7 +369,7 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam
|
||||
channelResults <- channelResult{Err: nil, Name: queryName, Query: promQuery.Query, Series: mergedSeries}
|
||||
|
||||
// Cache the seriesList for future queries
|
||||
if len(missedSeries) > 0 && !params.NoCache && q.cache != nil {
|
||||
if len(missedSeries) > 0 && !params.NoCache && q.cache != nil && ok {
|
||||
mergedSeriesData, err := json.Marshal(mergedSeries)
|
||||
if err != nil {
|
||||
zap.L().Error("error marshalling merged series", zap.Error(err))
|
||||
@ -539,3 +542,7 @@ func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3,
|
||||
func (q *querier) QueriesExecuted() []string {
|
||||
return q.queriesExecuted
|
||||
}
|
||||
|
||||
func (q *querier) TimeRanges() [][]int {
|
||||
return q.timeRanges
|
||||
}
|
||||
|
1060
pkg/query-service/app/querier/v2/querier_test.go
Normal file
1060
pkg/query-service/app/querier/v2/querier_test.go
Normal file
File diff suppressed because it is too large
Load Diff
@ -110,4 +110,5 @@ type Querier interface {
|
||||
|
||||
// test helpers
|
||||
QueriesExecuted() []string
|
||||
TimeRanges() [][]int
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user