diff --git a/pkg/query-service/app/querier/helper.go b/pkg/query-service/app/querier/helper.go index d65627bd92..1da4a5a46a 100644 --- a/pkg/query-service/app/querier/helper.go +++ b/pkg/query-service/app/querier/helper.go @@ -18,7 +18,7 @@ import ( "go.uber.org/zap" ) -func prepareLogsQuery(ctx context.Context, +func prepareLogsQuery(_ context.Context, start, end int64, builderQuery *v3.BuilderQuery, diff --git a/pkg/query-service/app/querier/querier.go b/pkg/query-service/app/querier/querier.go index 7668d401cd..95cfc7cc73 100644 --- a/pkg/query-service/app/querier/querier.go +++ b/pkg/query-service/app/querier/querier.go @@ -50,8 +50,10 @@ type querier struct { // TODO(srikanthccv): remove this once we have a proper mock testingMode bool queriesExecuted []string - returnedSeries []*v3.Series - returnedErr error + // tuple of start and end time in milliseconds + timeRanges [][]int + returnedSeries []*v3.Series + returnedErr error } type QuerierOptions struct { @@ -117,6 +119,7 @@ func (q *querier) execClickHouseQuery(ctx context.Context, query string) ([]*v3. func (q *querier) execPromQuery(ctx context.Context, params *model.QueryRangeParams) ([]*v3.Series, error) { q.queriesExecuted = append(q.queriesExecuted, params.Query) if q.testingMode && q.reader == nil { + q.timeRanges = append(q.timeRanges, []int{int(params.Start.UnixMilli()), int(params.End.UnixMilli())}) return q.returnedSeries, q.returnedErr } promResult, _, err := q.reader.GetQueryRangeResult(ctx, params) @@ -342,10 +345,10 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam wg.Add(1) go func(queryName string, promQuery *v3.PromQuery) { defer wg.Done() - cacheKey := cacheKeys[queryName] + cacheKey, ok := cacheKeys[queryName] var cachedData []byte // Ensure NoCache is not set and cache is not nil - if !params.NoCache && q.cache != nil { + if !params.NoCache && q.cache != nil && ok { data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true) zap.L().Info("cache retrieve status", zap.String("status", retrieveStatus.String())) if err == nil { @@ -373,7 +376,7 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam channelResults <- channelResult{Err: nil, Name: queryName, Query: promQuery.Query, Series: mergedSeries} // Cache the seriesList for future queries - if len(missedSeries) > 0 && !params.NoCache && q.cache != nil { + if len(missedSeries) > 0 && !params.NoCache && q.cache != nil && ok { mergedSeriesData, err := json.Marshal(mergedSeries) if err != nil { zap.L().Error("error marshalling merged series", zap.Error(err)) @@ -546,3 +549,7 @@ func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3, func (q *querier) QueriesExecuted() []string { return q.queriesExecuted } + +func (q *querier) TimeRanges() [][]int { + return q.timeRanges +} diff --git a/pkg/query-service/app/querier/querier_test.go b/pkg/query-service/app/querier/querier_test.go index a9f8cc4030..5160c564da 100644 --- a/pkg/query-service/app/querier/querier_test.go +++ b/pkg/query-service/app/querier/querier_test.go @@ -951,3 +951,102 @@ func TestQueryRangeTimeShiftWithLimitAndCache(t *testing.T) { } } } + +func TestQueryRangeValueTypePromQL(t *testing.T) { + // There shouldn't be any caching for value panel type + params := []*v3.QueryRangeParamsV3{ + { + Start: 1675115596722, + End: 1675115596722 + 120*60*1000, + Step: 5 * time.Minute.Milliseconds(), + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypePromQL, + PanelType: v3.PanelTypeValue, + PromQueries: map[string]*v3.PromQuery{ + "A": { + Query: "signoz_calls_total", + }, + }, + }, + }, + { + Start: 1675115596722 + 60*60*1000, + End: 1675115596722 + 180*60*1000, + Step: 5 * time.Minute.Milliseconds(), + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypePromQL, + PanelType: v3.PanelTypeValue, + PromQueries: map[string]*v3.PromQuery{ + "A": { + Query: "signoz_latency_bucket", + }, + }, + }, + }, + } + cache := inmemory.New(&inmemory.Options{TTL: 60 * time.Minute, CleanupInterval: 10 * time.Minute}) + opts := QuerierOptions{ + Cache: cache, + Reader: nil, + FluxInterval: 5 * time.Minute, + KeyGenerator: queryBuilder.NewKeyGenerator(), + + TestingMode: true, + ReturnedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "method": "GET", + "service_name": "test", + "__name__": "doesn't matter", + }, + Points: []v3.Point{ + {Timestamp: 1675115596722, Value: 1}, + {Timestamp: 1675115596722 + 60*60*1000, Value: 2}, + {Timestamp: 1675115596722 + 120*60*1000, Value: 3}, + }, + }, + }, + } + q := NewQuerier(opts) + + expectedQueryAndTimeRanges := []struct { + query string + ranges []missInterval + }{ + { + query: "signoz_calls_total", + ranges: []missInterval{ + {start: 1675115596722, end: 1675115596722 + 120*60*1000}, + }, + }, + { + query: "signoz_latency_bucket", + ranges: []missInterval{ + {start: 1675115596722 + 60*60*1000, end: 1675115596722 + 180*60*1000}, + }, + }, + } + + for i, param := range params { + _, errByName, err := q.QueryRange(context.Background(), param, nil) + if err != nil { + t.Errorf("expected no error, got %s", err) + } + if len(errByName) > 0 { + t.Errorf("expected no error, got %v", errByName) + } + + if !strings.Contains(q.QueriesExecuted()[i], expectedQueryAndTimeRanges[i].query) { + t.Errorf("expected query to contain %s, got %s", expectedQueryAndTimeRanges[i].query, q.QueriesExecuted()[i]) + } + if len(q.TimeRanges()[i]) != 2 { + t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i]) + } + if q.TimeRanges()[i][0] != int(expectedQueryAndTimeRanges[i].ranges[0].start) { + t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i]) + } + if q.TimeRanges()[i][1] != int(expectedQueryAndTimeRanges[i].ranges[0].end) { + t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i]) + } + } +} diff --git a/pkg/query-service/app/querier/v2/helper.go b/pkg/query-service/app/querier/v2/helper.go index 04f798ad1b..9df9965b5c 100644 --- a/pkg/query-service/app/querier/v2/helper.go +++ b/pkg/query-service/app/querier/v2/helper.go @@ -18,7 +18,7 @@ import ( "go.uber.org/zap" ) -func prepareLogsQuery(ctx context.Context, +func prepareLogsQuery(_ context.Context, start, end int64, builderQuery *v3.BuilderQuery, diff --git a/pkg/query-service/app/querier/v2/querier.go b/pkg/query-service/app/querier/v2/querier.go index e6915ef078..b8a8a9e92e 100644 --- a/pkg/query-service/app/querier/v2/querier.go +++ b/pkg/query-service/app/querier/v2/querier.go @@ -50,8 +50,10 @@ type querier struct { // TODO(srikanthccv): remove this once we have a proper mock testingMode bool queriesExecuted []string - returnedSeries []*v3.Series - returnedErr error + // tuple of start and end time in milliseconds + timeRanges [][]int + returnedSeries []*v3.Series + returnedErr error } type QuerierOptions struct { @@ -117,6 +119,7 @@ func (q *querier) execClickHouseQuery(ctx context.Context, query string) ([]*v3. func (q *querier) execPromQuery(ctx context.Context, params *model.QueryRangeParams) ([]*v3.Series, error) { q.queriesExecuted = append(q.queriesExecuted, params.Query) if q.testingMode && q.reader == nil { + q.timeRanges = append(q.timeRanges, []int{int(params.Start.UnixMilli()), int(params.End.UnixMilli())}) return q.returnedSeries, q.returnedErr } promResult, _, err := q.reader.GetQueryRangeResult(ctx, params) @@ -335,10 +338,10 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam wg.Add(1) go func(queryName string, promQuery *v3.PromQuery) { defer wg.Done() - cacheKey := cacheKeys[queryName] + cacheKey, ok := cacheKeys[queryName] var cachedData []byte // Ensure NoCache is not set and cache is not nil - if !params.NoCache && q.cache != nil { + if !params.NoCache && q.cache != nil && ok { data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true) zap.L().Info("cache retrieve status", zap.String("status", retrieveStatus.String())) if err == nil { @@ -366,7 +369,7 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam channelResults <- channelResult{Err: nil, Name: queryName, Query: promQuery.Query, Series: mergedSeries} // Cache the seriesList for future queries - if len(missedSeries) > 0 && !params.NoCache && q.cache != nil { + if len(missedSeries) > 0 && !params.NoCache && q.cache != nil && ok { mergedSeriesData, err := json.Marshal(mergedSeries) if err != nil { zap.L().Error("error marshalling merged series", zap.Error(err)) @@ -539,3 +542,7 @@ func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3, func (q *querier) QueriesExecuted() []string { return q.queriesExecuted } + +func (q *querier) TimeRanges() [][]int { + return q.timeRanges +} diff --git a/pkg/query-service/app/querier/v2/querier_test.go b/pkg/query-service/app/querier/v2/querier_test.go new file mode 100644 index 0000000000..d29785b310 --- /dev/null +++ b/pkg/query-service/app/querier/v2/querier_test.go @@ -0,0 +1,1060 @@ +package v2 + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" + "go.signoz.io/signoz/pkg/query-service/cache/inmemory" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +func TestV2FindMissingTimeRangesZeroFreshNess(t *testing.T) { + // There are five scenarios: + // 1. Cached time range is a subset of the requested time range + // 2. Cached time range is a superset of the requested time range + // 3. Cached time range is a left overlap of the requested time range + // 4. Cached time range is a right overlap of the requested time range + // 5. Cached time range is a disjoint of the requested time range + testCases := []struct { + name string + requestedStart int64 // in milliseconds + requestedEnd int64 // in milliseconds + requestedStep int64 // in seconds + cachedSeries []*v3.Series + expectedMiss []missInterval + }{ + { + name: "cached time range is a subset of the requested time range", + requestedStart: 1675115596722, + requestedEnd: 1675115596722 + 180*60*1000, + requestedStep: 60, + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + { + Timestamp: 1675115596722 + 60*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 120*60*1000, + Value: 1, + }, + }, + }, + }, + expectedMiss: []missInterval{ + { + start: 1675115596722, + end: 1675115596722 + 60*60*1000 - 1, + }, + { + start: 1675115596722 + 120*60*1000 + 1, + end: 1675115596722 + 180*60*1000, + }, + }, + }, + { + name: "cached time range is a superset of the requested time range", + requestedStart: 1675115596722, + requestedEnd: 1675115596722 + 180*60*1000, + requestedStep: 60, + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + { + Timestamp: 1675115596722, + Value: 1, + }, + { + Timestamp: 1675115596722 + 60*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 120*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 180*60*1000, + Value: 1, + }, + }, + }, + }, + expectedMiss: []missInterval{}, + }, + { + name: "cached time range is a left overlap of the requested time range", + requestedStart: 1675115596722, + requestedEnd: 1675115596722 + 180*60*1000, + requestedStep: 60, + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + { + Timestamp: 1675115596722, + Value: 1, + }, + { + Timestamp: 1675115596722 + 60*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 120*60*1000, + Value: 1, + }, + }, + }, + }, + expectedMiss: []missInterval{ + { + start: 1675115596722 + 120*60*1000 + 1, + end: 1675115596722 + 180*60*1000, + }, + }, + }, + { + name: "cached time range is a right overlap of the requested time range", + requestedStart: 1675115596722, + requestedEnd: 1675115596722 + 180*60*1000, + requestedStep: 60, + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + { + Timestamp: 1675115596722 + 60*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 120*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 180*60*1000, + Value: 1, + }, + }, + }, + }, + expectedMiss: []missInterval{ + { + start: 1675115596722, + end: 1675115596722 + 60*60*1000 - 1, + }, + }, + }, + { + name: "cached time range is a disjoint of the requested time range", + requestedStart: 1675115596722, + requestedEnd: 1675115596722 + 180*60*1000, + requestedStep: 60, + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + { + Timestamp: 1675115596722 + 240*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 300*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 360*60*1000, + Value: 1, + }, + }, + }, + }, + expectedMiss: []missInterval{ + { + start: 1675115596722, + end: 1675115596722 + 180*60*1000, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + misses := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, 0*time.Minute) + if len(misses) != len(tc.expectedMiss) { + t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses)) + } + for i, miss := range misses { + if miss.start != tc.expectedMiss[i].start { + t.Errorf("expected start %d, got %d", tc.expectedMiss[i].start, miss.start) + } + if miss.end != tc.expectedMiss[i].end { + t.Errorf("expected end %d, got %d", tc.expectedMiss[i].end, miss.end) + } + } + }) + } +} + +func TestV2FindMissingTimeRangesWithFluxInterval(t *testing.T) { + + testCases := []struct { + name string + requestedStart int64 + requestedEnd int64 + requestedStep int64 + cachedSeries []*v3.Series + fluxInterval time.Duration + expectedMiss []missInterval + }{ + { + name: "cached time range is a subset of the requested time range", + requestedStart: 1675115596722, + requestedEnd: 1675115596722 + 180*60*1000, + requestedStep: 60, + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + { + Timestamp: 1675115596722 + 60*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 120*60*1000, + Value: 1, + }, + }, + }, + }, + fluxInterval: 5 * time.Minute, + expectedMiss: []missInterval{ + { + start: 1675115596722, + end: 1675115596722 + 60*60*1000 - 1, + }, + { + start: 1675115596722 + 120*60*1000 + 1, + end: 1675115596722 + 180*60*1000, + }, + }, + }, + { + name: "cached time range is a superset of the requested time range", + requestedStart: 1675115596722, + requestedEnd: 1675115596722 + 180*60*1000, + requestedStep: 60, + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + { + Timestamp: 1675115596722, + Value: 1, + }, + { + Timestamp: 1675115596722 + 60*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 120*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 180*60*1000, + Value: 1, + }, + }, + }, + }, + fluxInterval: 5 * time.Minute, + expectedMiss: []missInterval{}, + }, + { + name: "cache time range is a left overlap of the requested time range", + requestedStart: 1675115596722, + requestedEnd: 1675115596722 + 180*60*1000, + requestedStep: 60, + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + { + Timestamp: 1675115596722, + Value: 1, + }, + { + Timestamp: 1675115596722 + 60*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 120*60*1000, + Value: 1, + }, + }, + }, + }, + fluxInterval: 5 * time.Minute, + expectedMiss: []missInterval{ + { + start: 1675115596722 + 120*60*1000 + 1, + end: 1675115596722 + 180*60*1000, + }, + }, + }, + { + name: "cache time range is a right overlap of the requested time range", + requestedStart: 1675115596722, + requestedEnd: 1675115596722 + 180*60*1000, + requestedStep: 60, + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + { + Timestamp: 1675115596722 + 60*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 120*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 180*60*1000, + Value: 1, + }, + }, + }, + }, + fluxInterval: 5 * time.Minute, + expectedMiss: []missInterval{ + { + start: 1675115596722, + end: 1675115596722 + 60*60*1000 - 1, + }, + }, + }, + { + name: "cache time range is a disjoint of the requested time range", + requestedStart: 1675115596722, + requestedEnd: 1675115596722 + 180*60*1000, + requestedStep: 60, + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + { + Timestamp: 1675115596722 + 240*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 300*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 360*60*1000, + Value: 1, + }, + }, + }, + }, + fluxInterval: 5 * time.Minute, + expectedMiss: []missInterval{ + { + start: 1675115596722, + end: 1675115596722 + 180*60*1000, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + misses := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.requestedStep, tc.cachedSeries, tc.fluxInterval) + if len(misses) != len(tc.expectedMiss) { + t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses)) + } + for i, miss := range misses { + if miss.start != tc.expectedMiss[i].start { + t.Errorf("expected start %d, got %d", tc.expectedMiss[i].start, miss.start) + } + if miss.end != tc.expectedMiss[i].end { + t.Errorf("expected end %d, got %d", tc.expectedMiss[i].end, miss.end) + } + } + }) + } +} + +func TestV2QueryRange(t *testing.T) { + params := []*v3.QueryRangeParamsV3{ + { + Start: 1675115596722, + End: 1675115596722 + 120*60*1000, + Step: 60, + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + DataSource: v3.DataSourceMetrics, + Temporality: v3.Delta, + StepInterval: 60, + AggregateAttribute: v3.AttributeKey{Key: "http_server_requests_seconds_count", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true}, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "method", IsColumn: false}, + Operator: "=", + Value: "GET", + }, + }, + }, + GroupBy: []v3.AttributeKey{ + {Key: "service_name", IsColumn: false}, + {Key: "method", IsColumn: false}, + }, + AggregateOperator: v3.AggregateOperatorSumRate, + TimeAggregation: v3.TimeAggregationRate, + SpaceAggregation: v3.SpaceAggregationSum, + Expression: "A", + }, + }, + }, + }, + { + Start: 1675115596722 + 60*60*1000, + End: 1675115596722 + 180*60*1000, + Step: 60, + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + Temporality: v3.Delta, + StepInterval: 60, + AggregateAttribute: v3.AttributeKey{Key: "http_server_requests_seconds_count", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true}, + DataSource: v3.DataSourceMetrics, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "method", IsColumn: false}, + Operator: "=", + Value: "GET", + }, + }, + }, + GroupBy: []v3.AttributeKey{ + {Key: "service_name", IsColumn: false}, + {Key: "method", IsColumn: false}, + }, + AggregateOperator: v3.AggregateOperatorSumRate, + TimeAggregation: v3.TimeAggregationRate, + SpaceAggregation: v3.SpaceAggregationSum, + Expression: "A", + }, + }, + }, + }, + // No caching for traces yet + { + Start: 1675115596722, + End: 1675115596722 + 120*60*1000, + Step: 5 * time.Minute.Milliseconds(), + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "durationNano", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true}, + StepInterval: 60, + DataSource: v3.DataSourceTraces, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "method", IsColumn: false}, + Operator: "=", + Value: "GET", + }, + }, + }, + GroupBy: []v3.AttributeKey{ + {Key: "serviceName", IsColumn: false}, + {Key: "name", IsColumn: false}, + }, + AggregateOperator: v3.AggregateOperatorP95, + Expression: "A", + }, + }, + }, + }, + { + Start: 1675115596722 + 60*60*1000, + End: 1675115596722 + 180*60*1000, + Step: 5 * time.Minute.Milliseconds(), + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "durationNano", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true}, + StepInterval: 60, + DataSource: v3.DataSourceTraces, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "method", IsColumn: false}, + Operator: "=", + Value: "GET", + }, + }, + }, + GroupBy: []v3.AttributeKey{ + {Key: "serviceName", IsColumn: false}, + {Key: "name", IsColumn: false}, + }, + AggregateOperator: v3.AggregateOperatorP95, + Expression: "A", + }, + }, + }, + }, + } + cache := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute}) + opts := QuerierOptions{ + Cache: cache, + Reader: nil, + FluxInterval: 5 * time.Minute, + KeyGenerator: queryBuilder.NewKeyGenerator(), + + TestingMode: true, + ReturnedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "method": "GET", + "service_name": "test", + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596722, Value: 1}, + {Timestamp: 1675115596722 + 60*60*1000, Value: 2}, + {Timestamp: 1675115596722 + 120*60*1000, Value: 3}, + }, + }, + }, + } + q := NewQuerier(opts) + expectedTimeRangeInQueryString := []string{ + fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115580000, 1675115580000+120*60*1000), + fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115580000+120*60*1000, 1675115580000+180*60*1000), + fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", 1675115580000*1000000, (1675115580000+120*60*1000)*int64(1000000)), + fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", (1675115580000+60*60*1000)*int64(1000000), (1675115580000+180*60*1000)*int64(1000000)), + } + + for i, param := range params { + _, errByName, err := q.QueryRange(context.Background(), param, nil) + if err != nil { + t.Errorf("expected no error, got %s", err) + } + if len(errByName) > 0 { + t.Errorf("expected no error, got %v", errByName) + } + + if !strings.Contains(q.QueriesExecuted()[i], expectedTimeRangeInQueryString[i]) { + t.Errorf("expected query to contain %s, got %s", expectedTimeRangeInQueryString[i], q.QueriesExecuted()[i]) + } + } +} + +func TestV2QueryRangeValueType(t *testing.T) { + // There shouldn't be any caching for value panel type + params := []*v3.QueryRangeParamsV3{ + { + Start: 1675115596722, + End: 1675115596722 + 120*60*1000, + Step: 5 * time.Minute.Milliseconds(), + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeValue, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{Key: "http_server_requests_seconds_count", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true}, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "method", IsColumn: false}, + Operator: "=", + Value: "GET", + }, + }, + }, + AggregateOperator: v3.AggregateOperatorSumRate, + TimeAggregation: v3.TimeAggregationRate, + SpaceAggregation: v3.SpaceAggregationSum, + Expression: "A", + ReduceTo: v3.ReduceToOperatorLast, + }, + }, + }, + }, + { + Start: 1675115596722 + 60*60*1000, + End: 1675115596722 + 180*60*1000, + Step: 5 * time.Minute.Milliseconds(), + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeValue, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceTraces, + AggregateAttribute: v3.AttributeKey{Key: "durationNano", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true}, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "method", IsColumn: false}, + Operator: "=", + Value: "GET", + }, + }, + }, + AggregateOperator: v3.AggregateOperatorP95, + Expression: "A", + ReduceTo: v3.ReduceToOperatorLast, + }, + }, + }, + }, + } + cache := inmemory.New(&inmemory.Options{TTL: 60 * time.Minute, CleanupInterval: 10 * time.Minute}) + opts := QuerierOptions{ + Cache: cache, + Reader: nil, + FluxInterval: 5 * time.Minute, + KeyGenerator: queryBuilder.NewKeyGenerator(), + + TestingMode: true, + ReturnedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "method": "GET", + "service_name": "test", + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596722, Value: 1}, + {Timestamp: 1675115596722 + 60*60*1000, Value: 2}, + {Timestamp: 1675115596722 + 120*60*1000, Value: 3}, + }, + }, + }, + } + q := NewQuerier(opts) + // No caching + expectedTimeRangeInQueryString := []string{ + fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115520000, 1675115580000+120*60*1000), + fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", (1675115580000+60*60*1000)*int64(1000000), (1675115580000+180*60*1000)*int64(1000000)), + } + + for i, param := range params { + _, errByName, err := q.QueryRange(context.Background(), param, nil) + if err != nil { + t.Errorf("expected no error, got %s", err) + } + if len(errByName) > 0 { + t.Errorf("expected no error, got %v", errByName) + } + + if !strings.Contains(q.QueriesExecuted()[i], expectedTimeRangeInQueryString[i]) { + t.Errorf("expected query to contain %s, got %s", expectedTimeRangeInQueryString[i], q.QueriesExecuted()[i]) + } + } +} + +// test timeshift +func TestV2QueryRangeTimeShift(t *testing.T) { + params := []*v3.QueryRangeParamsV3{ + { + Start: 1675115596722, //31, 3:23 + End: 1675115596722 + 120*60*1000, //31, 5:23 + Step: 5 * time.Minute.Milliseconds(), + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceLogs, + AggregateAttribute: v3.AttributeKey{}, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + AggregateOperator: v3.AggregateOperatorCount, + Expression: "A", + ShiftBy: 86400, + }, + }, + }, + }, + } + opts := QuerierOptions{ + Reader: nil, + FluxInterval: 5 * time.Minute, + KeyGenerator: queryBuilder.NewKeyGenerator(), + TestingMode: true, + } + q := NewQuerier(opts) + // logs queries are generates in ns + expectedTimeRangeInQueryString := fmt.Sprintf("timestamp >= %d AND timestamp <= %d", (1675115596722-86400*1000)*1000000, ((1675115596722+120*60*1000)-86400*1000)*1000000) + + for i, param := range params { + _, errByName, err := q.QueryRange(context.Background(), param, nil) + if err != nil { + t.Errorf("expected no error, got %s", err) + } + if len(errByName) > 0 { + t.Errorf("expected no error, got %v", errByName) + } + if !strings.Contains(q.QueriesExecuted()[i], expectedTimeRangeInQueryString) { + t.Errorf("expected query to contain %s, got %s", expectedTimeRangeInQueryString, q.QueriesExecuted()[i]) + } + } +} + +// timeshift works with caching +func TestV2QueryRangeTimeShiftWithCache(t *testing.T) { + params := []*v3.QueryRangeParamsV3{ + { + Start: 1675115596722 + 60*60*1000 - 86400*1000, //30, 4:23 + End: 1675115596722 + 120*60*1000 - 86400*1000, //30, 5:23 + Step: 5 * time.Minute.Milliseconds(), + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceLogs, + AggregateAttribute: v3.AttributeKey{}, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + AggregateOperator: v3.AggregateOperatorCount, + Expression: "A", + GroupBy: []v3.AttributeKey{ + {Key: "service_name", IsColumn: false}, + {Key: "method", IsColumn: false}, + }, + }, + }, + }, + }, + { + Start: 1675115596722, //31, 3:23 + End: 1675115596722 + 120*60*1000, //31, 5:23 + Step: 5 * time.Minute.Milliseconds(), + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceLogs, + AggregateAttribute: v3.AttributeKey{}, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + AggregateOperator: v3.AggregateOperatorCount, + Expression: "A", + ShiftBy: 86400, + GroupBy: []v3.AttributeKey{ + {Key: "service_name", IsColumn: false}, + {Key: "method", IsColumn: false}, + }, + }, + }, + }, + }, + } + cache := inmemory.New(&inmemory.Options{TTL: 60 * time.Minute, CleanupInterval: 10 * time.Minute}) + opts := QuerierOptions{ + Cache: cache, + Reader: nil, + FluxInterval: 5 * time.Minute, + KeyGenerator: queryBuilder.NewKeyGenerator(), + TestingMode: true, + ReturnedSeries: []*v3.Series{ + { + Labels: map[string]string{}, + Points: []v3.Point{ + {Timestamp: 1675115596722 + 60*60*1000 - 86400*1000, Value: 1}, + {Timestamp: 1675115596722 + 120*60*1000 - 86400*1000 + 60*60*1000, Value: 2}, + }, + }, + }, + } + q := NewQuerier(opts) + + // logs queries are generates in ns + expectedTimeRangeInQueryString := []string{ + fmt.Sprintf("timestamp >= %d AND timestamp <= %d", (1675115596722+60*60*1000-86400*1000)*1000000, (1675115596722+120*60*1000-86400*1000)*1000000), + fmt.Sprintf("timestamp >= %d AND timestamp <= %d", (1675115596722-86400*1000)*1000000, ((1675115596722+60*60*1000)-86400*1000-1)*1000000), + } + + for i, param := range params { + _, errByName, err := q.QueryRange(context.Background(), param, nil) + if err != nil { + t.Errorf("expected no error, got %s", err) + } + if len(errByName) > 0 { + t.Errorf("expected no error, got %v", errByName) + } + if !strings.Contains(q.QueriesExecuted()[i], expectedTimeRangeInQueryString[i]) { + t.Errorf("expected query to contain %s, got %s", expectedTimeRangeInQueryString[i], q.QueriesExecuted()[i]) + } + } +} + +// timeshift with limit queries +func TestV2QueryRangeTimeShiftWithLimitAndCache(t *testing.T) { + params := []*v3.QueryRangeParamsV3{ + { + Start: 1675115596722 + 60*60*1000 - 86400*1000, //30, 4:23 + End: 1675115596722 + 120*60*1000 - 86400*1000, //30, 5:23 + Step: 5 * time.Minute.Milliseconds(), + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceLogs, + AggregateAttribute: v3.AttributeKey{}, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + AggregateOperator: v3.AggregateOperatorCount, + Expression: "A", + GroupBy: []v3.AttributeKey{ + {Key: "service_name", IsColumn: false}, + {Key: "method", IsColumn: false}, + }, + Limit: 5, + }, + }, + }, + }, + { + Start: 1675115596722, //31, 3:23 + End: 1675115596722 + 120*60*1000, //31, 5:23 + Step: 5 * time.Minute.Milliseconds(), + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceLogs, + AggregateAttribute: v3.AttributeKey{}, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + AggregateOperator: v3.AggregateOperatorCount, + Expression: "A", + ShiftBy: 86400, + GroupBy: []v3.AttributeKey{ + {Key: "service_name", IsColumn: false}, + {Key: "method", IsColumn: false}, + }, + Limit: 5, + }, + }, + }, + }, + } + cache := inmemory.New(&inmemory.Options{TTL: 60 * time.Minute, CleanupInterval: 10 * time.Minute}) + opts := QuerierOptions{ + Cache: cache, + Reader: nil, + FluxInterval: 5 * time.Minute, + KeyGenerator: queryBuilder.NewKeyGenerator(), + TestingMode: true, + ReturnedSeries: []*v3.Series{ + { + Labels: map[string]string{}, + Points: []v3.Point{ + {Timestamp: 1675115596722 + 60*60*1000 - 86400*1000, Value: 1}, + {Timestamp: 1675115596722 + 120*60*1000 - 86400*1000 + 60*60*1000, Value: 2}, + }, + }, + }, + } + q := NewQuerier(opts) + + // logs queries are generates in ns + expectedTimeRangeInQueryString := []string{ + fmt.Sprintf("timestamp >= %d AND timestamp <= %d", (1675115596722+60*60*1000-86400*1000)*1000000, (1675115596722+120*60*1000-86400*1000)*1000000), + fmt.Sprintf("timestamp >= %d AND timestamp <= %d", (1675115596722-86400*1000)*1000000, ((1675115596722+60*60*1000)-86400*1000-1)*1000000), + } + + for i, param := range params { + _, errByName, err := q.QueryRange(context.Background(), param, nil) + if err != nil { + t.Errorf("expected no error, got %s", err) + } + if len(errByName) > 0 { + t.Errorf("expected no error, got %v", errByName) + } + if !strings.Contains(q.QueriesExecuted()[i], expectedTimeRangeInQueryString[i]) { + t.Errorf("expected query to contain %s, got %s", expectedTimeRangeInQueryString[i], q.QueriesExecuted()[i]) + } + } +} + +func TestV2QueryRangeValueTypePromQL(t *testing.T) { + // There shouldn't be any caching for value panel type + params := []*v3.QueryRangeParamsV3{ + { + Start: 1675115596722, + End: 1675115596722 + 120*60*1000, + Step: 5 * time.Minute.Milliseconds(), + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypePromQL, + PanelType: v3.PanelTypeValue, + PromQueries: map[string]*v3.PromQuery{ + "A": { + Query: "signoz_calls_total", + }, + }, + }, + }, + { + Start: 1675115596722 + 60*60*1000, + End: 1675115596722 + 180*60*1000, + Step: 5 * time.Minute.Milliseconds(), + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypePromQL, + PanelType: v3.PanelTypeValue, + PromQueries: map[string]*v3.PromQuery{ + "A": { + Query: "signoz_latency_bucket", + }, + }, + }, + }, + } + cache := inmemory.New(&inmemory.Options{TTL: 60 * time.Minute, CleanupInterval: 10 * time.Minute}) + opts := QuerierOptions{ + Cache: cache, + Reader: nil, + FluxInterval: 5 * time.Minute, + KeyGenerator: queryBuilder.NewKeyGenerator(), + + TestingMode: true, + ReturnedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "method": "GET", + "service_name": "test", + "__name__": "doesn't matter", + }, + Points: []v3.Point{ + {Timestamp: 1675115596722, Value: 1}, + {Timestamp: 1675115596722 + 60*60*1000, Value: 2}, + {Timestamp: 1675115596722 + 120*60*1000, Value: 3}, + }, + }, + }, + } + q := NewQuerier(opts) + + expectedQueryAndTimeRanges := []struct { + query string + ranges []missInterval + }{ + { + query: "signoz_calls_total", + ranges: []missInterval{ + {start: 1675115596722, end: 1675115596722 + 120*60*1000}, + }, + }, + { + query: "signoz_latency_bucket", + ranges: []missInterval{ + {start: 1675115596722 + 60*60*1000, end: 1675115596722 + 180*60*1000}, + }, + }, + } + + for i, param := range params { + _, errByName, err := q.QueryRange(context.Background(), param, nil) + if err != nil { + t.Errorf("expected no error, got %s", err) + } + if len(errByName) > 0 { + t.Errorf("expected no error, got %v", errByName) + } + + if !strings.Contains(q.QueriesExecuted()[i], expectedQueryAndTimeRanges[i].query) { + t.Errorf("expected query to contain %s, got %s", expectedQueryAndTimeRanges[i].query, q.QueriesExecuted()[i]) + } + if len(q.TimeRanges()[i]) != 2 { + t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i]) + } + if q.TimeRanges()[i][0] != int(expectedQueryAndTimeRanges[i].ranges[0].start) { + t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i]) + } + if q.TimeRanges()[i][1] != int(expectedQueryAndTimeRanges[i].ranges[0].end) { + t.Errorf("expected time ranges to be %v, got %v", expectedQueryAndTimeRanges[i].ranges, q.TimeRanges()[i]) + } + } +} diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 0ab20fed0e..385d48173b 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -110,4 +110,5 @@ type Querier interface { // test helpers QueriesExecuted() []string + TimeRanges() [][]int }