feat: add querier interface and initial implementation (#2782)

This commit is contained in:
Srikanth Chekuri 2023-06-08 15:46:18 +05:30 committed by GitHub
parent 7415de4751
commit 04a9de1e32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 1069 additions and 2 deletions

View File

@ -140,5 +140,6 @@ test:
go test ./pkg/query-service/app/metrics/...
go test ./pkg/query-service/cache/...
go test ./pkg/query-service/app/...
go test ./pkg/query-service/app/querier/...
go test ./pkg/query-service/converter/...
go test ./pkg/query-service/formatter/...

View File

@ -3,8 +3,10 @@ package v3
import (
"fmt"
"strings"
"time"
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/utils"
)
@ -403,3 +405,12 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P
}
return query, err
}
func BuildPromQuery(promQuery *v3.PromQuery, step, start, end int64) *model.QueryRangeParams {
return &model.QueryRangeParams{
Query: promQuery.Query,
Start: time.UnixMilli(start),
End: time.UnixMilli(end),
Step: time.Duration(step * int64(time.Second)),
}
}

View File

@ -0,0 +1,426 @@
package querier
import (
"context"
"encoding/json"
"fmt"
"math"
"sort"
"strings"
"time"
logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3"
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
"go.signoz.io/signoz/pkg/query-service/cache"
"go.signoz.io/signoz/pkg/query-service/cache/status"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.uber.org/zap"
)
type missInterval struct {
start, end int64 // in milliseconds
}
type querier struct {
cache cache.Cache
reader interfaces.Reader
keyGenerator cache.KeyGenerator
fluxInterval time.Duration
// used for testing
// TODO(srikanthccv): remove this once we have a proper mock
testingMode bool
queriesExecuted []string
returnedSeries []*v3.Series
returnedErr error
}
type QuerierOptions struct {
Reader interfaces.Reader
Cache cache.Cache
KeyGenerator cache.KeyGenerator
FluxInterval time.Duration
// used for testing
TestingMode bool
ReturnedSeries []*v3.Series
ReturnedErr error
}
func NewQuerier(opts QuerierOptions) interfaces.Querier {
return &querier{
cache: opts.Cache,
reader: opts.Reader,
keyGenerator: opts.KeyGenerator,
fluxInterval: opts.FluxInterval,
testingMode: opts.TestingMode,
returnedSeries: opts.ReturnedSeries,
returnedErr: opts.ReturnedErr,
}
}
func (q *querier) execClickHouseQuery(ctx context.Context, query string) ([]*v3.Series, error) {
q.queriesExecuted = append(q.queriesExecuted, query)
if q.testingMode && q.reader == nil {
return q.returnedSeries, q.returnedErr
}
return q.reader.GetTimeSeriesResultV3(ctx, query)
}
func (q *querier) execPromQuery(ctx context.Context, params *model.QueryRangeParams) ([]*v3.Series, error) {
q.queriesExecuted = append(q.queriesExecuted, params.Query)
if q.testingMode && q.reader == nil {
return q.returnedSeries, q.returnedErr
}
promResult, _, err := q.reader.GetQueryRangeResult(ctx, params)
if err != nil {
return nil, err
}
matrix, promErr := promResult.Matrix()
if promErr != nil {
return nil, promErr
}
var seriesList []*v3.Series
for _, v := range matrix {
var s v3.Series
s.Labels = v.Metric.Copy().Map()
for idx := range v.Points {
p := v.Points[idx]
s.Points = append(s.Points, v3.Point{Timestamp: p.T, Value: p.V})
}
seriesList = append(seriesList, &s)
}
return seriesList, nil
}
// findMissingTimeRanges finds the missing time ranges in the seriesList
// and returns a list of miss structs, It takes the fluxInterval into
// account to find the missing time ranges.
//
// The [End - fluxInterval, End] is always added to the list of misses, because
// the data might still be in flux and not yet available in the database.
func findMissingTimeRanges(start, end int64, seriesList []*v3.Series, fluxInterval time.Duration) (misses []missInterval) {
var cachedStart, cachedEnd int64
for idx := range seriesList {
series := seriesList[idx]
for pointIdx := range series.Points {
point := series.Points[pointIdx]
if cachedStart == 0 || point.Timestamp < cachedStart {
cachedStart = point.Timestamp
}
if cachedEnd == 0 || point.Timestamp > cachedEnd {
cachedEnd = point.Timestamp
}
}
}
// Exclude the flux interval from the cached end time
cachedEnd = int64(
math.Min(
float64(cachedEnd),
float64(time.Now().UnixMilli()-fluxInterval.Milliseconds()),
),
)
// There are five cases to consider
// 1. Cached time range is a subset of the requested time range
// 2. Cached time range is a superset of the requested time range
// 3. Cached time range is a left overlap of the requested time range
// 4. Cached time range is a right overlap of the requested time range
// 5. Cached time range is a disjoint of the requested time range
if cachedStart >= start && cachedEnd <= end {
// Case 1: Cached time range is a subset of the requested time range
// Add misses for the left and right sides of the cached time range
misses = append(misses, missInterval{start: start, end: cachedStart - 1})
misses = append(misses, missInterval{start: cachedEnd + 1, end: end})
} else if cachedStart <= start && cachedEnd >= end {
// Case 2: Cached time range is a superset of the requested time range
// No misses
} else if cachedStart <= start && cachedEnd >= start {
// Case 3: Cached time range is a left overlap of the requested time range
// Add a miss for the left side of the cached time range
misses = append(misses, missInterval{start: cachedEnd + 1, end: end})
} else if cachedStart <= end && cachedEnd >= end {
// Case 4: Cached time range is a right overlap of the requested time range
// Add a miss for the right side of the cached time range
misses = append(misses, missInterval{start: start, end: cachedStart - 1})
} else {
// Case 5: Cached time range is a disjoint of the requested time range
// Add a miss for the entire requested time range
misses = append(misses, missInterval{start: start, end: end})
}
// remove the struts with start > end
var validMisses []missInterval
for idx := range misses {
miss := misses[idx]
if miss.start <= miss.end {
validMisses = append(validMisses, miss)
}
}
return validMisses
}
// findMissingTimeRanges finds the missing time ranges in the cached data
// and returns them as a list of misses
func (q *querier) findMissingTimeRanges(start, end, step int64, cachedData []byte) (misses []missInterval) {
var cachedSeriesList []*v3.Series
if err := json.Unmarshal(cachedData, &cachedSeriesList); err != nil {
// In case of error, we return the entire range as a miss
return []missInterval{{start: start, end: end}}
}
return findMissingTimeRanges(start, end, cachedSeriesList, q.fluxInterval)
}
func labelsToString(labels map[string]string) string {
type label struct {
Key string
Value string
}
var labelsList []label
for k, v := range labels {
labelsList = append(labelsList, label{Key: k, Value: v})
}
sort.Slice(labelsList, func(i, j int) bool {
return labelsList[i].Key < labelsList[j].Key
})
labelKVs := make([]string, len(labelsList))
for idx := range labelsList {
labelKVs[idx] = labelsList[idx].Key + "=" + labelsList[idx].Value
}
return fmt.Sprintf("{%s}", strings.Join(labelKVs, ","))
}
func mergeSerieses(cachedSeries, missedSeries []*v3.Series) []*v3.Series {
// Merge the missed series with the cached series by timestamp
mergedSeries := make([]*v3.Series, 0)
seriesesByLabels := make(map[string]*v3.Series)
for idx := range cachedSeries {
series := cachedSeries[idx]
seriesesByLabels[labelsToString(series.Labels)] = series
}
for idx := range missedSeries {
series := missedSeries[idx]
if _, ok := seriesesByLabels[labelsToString(series.Labels)]; !ok {
seriesesByLabels[labelsToString(series.Labels)] = series
continue
}
seriesesByLabels[labelsToString(series.Labels)].Points = append(seriesesByLabels[labelsToString(series.Labels)].Points, series.Points...)
}
// Sort the points in each series by timestamp
for idx := range seriesesByLabels {
series := seriesesByLabels[idx]
series.SortPoints()
mergedSeries = append(mergedSeries, series)
}
return mergedSeries
}
func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangeParamsV3, fields map[string]v3.AttributeKey, keys map[string]v3.AttributeKey) ([]*v3.Series, error, map[string]string) {
cacheKeys := q.keyGenerator.GenerateKeys(params)
seriesList := make([]*v3.Series, 0)
errQueriesByName := make(map[string]string)
var err error
for queryName, builderQuery := range params.CompositeQuery.BuilderQueries {
// TODO: add support for logs and traces
if builderQuery.DataSource == v3.DataSourceLogs {
query, err := logsV3.PrepareLogsQuery(params.Start, params.End, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, fields)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
series, err := q.execClickHouseQuery(ctx, query)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
seriesList = append(seriesList, series...)
continue
}
if builderQuery.DataSource == v3.DataSourceTraces {
query, err := tracesV3.PrepareTracesQuery(params.Start, params.End, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, keys)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
series, err := q.execClickHouseQuery(ctx, query)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
seriesList = append(seriesList, series...)
continue
}
cacheKey := cacheKeys[queryName]
var cachedData []byte
if !params.NoCache {
var retrieveStatus status.RetrieveStatus
cachedData, retrieveStatus, err = q.cache.Retrieve(cacheKey, true)
zap.L().Debug("cache retrieve status", zap.String("status", retrieveStatus.String()))
if err != nil {
return nil, err, nil
}
}
misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {
query, err := metricsV3.PrepareMetricQuery(
miss.start,
miss.end,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
series, err := q.execClickHouseQuery(ctx, query)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
missedSeries = append(missedSeries, series...)
}
if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil {
errQueriesByName[queryName] = err.Error()
continue
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
seriesList = append(seriesList, mergedSeries...)
// Cache the seriesList for future queries
if len(missedSeries) > 0 {
mergedSeriesData, err := json.Marshal(mergedSeries)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
}
}
if len(errQueriesByName) > 0 {
err = fmt.Errorf("error in builder queries")
}
return seriesList, err, errQueriesByName
}
func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Series, error, map[string]string) {
seriesList := make([]*v3.Series, 0)
errQueriesByName := make(map[string]string)
var err error
for queryName, promQuery := range params.CompositeQuery.PromQueries {
cacheKey := q.keyGenerator.GenerateKeys(params)[queryName]
var cachedData []byte
var retrieveStatus status.RetrieveStatus
if !params.NoCache {
cachedData, retrieveStatus, err = q.cache.Retrieve(cacheKey, true)
zap.L().Debug("cache retrieve status", zap.String("status", retrieveStatus.String()))
}
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {
query := metricsV3.BuildPromQuery(
promQuery,
params.Step,
miss.start,
miss.end,
)
series, err := q.execPromQuery(ctx, query)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
missedSeries = append(missedSeries, series...)
}
if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil {
errQueriesByName[queryName] = err.Error()
continue
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
seriesList = append(seriesList, mergedSeries...)
// Cache the seriesList for future queries
if len(missedSeries) > 0 {
mergedSeriesData, err := json.Marshal(mergedSeries)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
}
}
if len(errQueriesByName) > 0 {
err = fmt.Errorf("error in prom queries")
}
return seriesList, err, errQueriesByName
}
func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Series, error, map[string]string) {
seriesList := make([]*v3.Series, 0)
errQueriesByName := make(map[string]string)
var err error
for queryName, clickHouseQuery := range params.CompositeQuery.ClickHouseQueries {
series, err := q.execClickHouseQuery(ctx, clickHouseQuery.Query)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
seriesList = append(seriesList, series...)
}
if len(errQueriesByName) > 0 {
err = fmt.Errorf("error in clickhouse queries")
}
return seriesList, err, errQueriesByName
}
func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3, fields map[string]v3.AttributeKey, keys map[string]v3.AttributeKey) ([]*v3.Series, error, map[string]string) {
var seriesList []*v3.Series
var err error
var errQueriesByName map[string]string
if params.CompositeQuery != nil {
switch params.CompositeQuery.QueryType {
case v3.QueryTypeBuilder:
seriesList, err, errQueriesByName = q.runBuilderQueries(ctx, params, fields, keys)
case v3.QueryTypePromQL:
seriesList, err, errQueriesByName = q.runPromQueries(ctx, params)
case v3.QueryTypeClickHouseSQL:
seriesList, err, errQueriesByName = q.runClickHouseQueries(ctx, params)
default:
err = fmt.Errorf("invalid query type")
}
}
return seriesList, err, errQueriesByName
}
func (q *querier) QueriesExecuted() []string {
return q.queriesExecuted
}

View File

@ -0,0 +1,507 @@
package querier
import (
"context"
"fmt"
"strings"
"testing"
"time"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
"go.signoz.io/signoz/pkg/query-service/cache/inmemory"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) {
// There are five scenarios:
// 1. Cached time range is a subset of the requested time range
// 2. Cached time range is a superset of the requested time range
// 3. Cached time range is a left overlap of the requested time range
// 4. Cached time range is a right overlap of the requested time range
// 5. Cached time range is a disjoint of the requested time range
testCases := []struct {
name string
requestedStart int64 // in milliseconds
requestedEnd int64 // in milliseconds
cachedSeries []*v3.Series
expectedMiss []missInterval
}{
{
name: "cached time range is a subset of the requested time range",
requestedStart: 1675115596722,
requestedEnd: 1675115596722 + 180*60*1000,
cachedSeries: []*v3.Series{
{
Labels: map[string]string{
"__name__": "http_server_requests_seconds_count",
},
Points: []v3.Point{
{
Timestamp: 1675115596722 + 60*60*1000,
Value: 1,
},
{
Timestamp: 1675115596722 + 120*60*1000,
Value: 1,
},
},
},
},
expectedMiss: []missInterval{
{
start: 1675115596722,
end: 1675115596722 + 60*60*1000 - 1,
},
{
start: 1675115596722 + 120*60*1000 + 1,
end: 1675115596722 + 180*60*1000,
},
},
},
{
name: "cached time range is a superset of the requested time range",
requestedStart: 1675115596722,
requestedEnd: 1675115596722 + 180*60*1000,
cachedSeries: []*v3.Series{
{
Labels: map[string]string{
"__name__": "http_server_requests_seconds_count",
},
Points: []v3.Point{
{
Timestamp: 1675115596722,
Value: 1,
},
{
Timestamp: 1675115596722 + 60*60*1000,
Value: 1,
},
{
Timestamp: 1675115596722 + 120*60*1000,
Value: 1,
},
{
Timestamp: 1675115596722 + 180*60*1000,
Value: 1,
},
},
},
},
expectedMiss: []missInterval{},
},
{
name: "cached time range is a left overlap of the requested time range",
requestedStart: 1675115596722,
requestedEnd: 1675115596722 + 180*60*1000,
cachedSeries: []*v3.Series{
{
Labels: map[string]string{
"__name__": "http_server_requests_seconds_count",
},
Points: []v3.Point{
{
Timestamp: 1675115596722,
Value: 1,
},
{
Timestamp: 1675115596722 + 60*60*1000,
Value: 1,
},
{
Timestamp: 1675115596722 + 120*60*1000,
Value: 1,
},
},
},
},
expectedMiss: []missInterval{
{
start: 1675115596722 + 120*60*1000 + 1,
end: 1675115596722 + 180*60*1000,
},
},
},
{
name: "cached time range is a right overlap of the requested time range",
requestedStart: 1675115596722,
requestedEnd: 1675115596722 + 180*60*1000,
cachedSeries: []*v3.Series{
{
Labels: map[string]string{
"__name__": "http_server_requests_seconds_count",
},
Points: []v3.Point{
{
Timestamp: 1675115596722 + 60*60*1000,
Value: 1,
},
{
Timestamp: 1675115596722 + 120*60*1000,
Value: 1,
},
{
Timestamp: 1675115596722 + 180*60*1000,
Value: 1,
},
},
},
},
expectedMiss: []missInterval{
{
start: 1675115596722,
end: 1675115596722 + 60*60*1000 - 1,
},
},
},
{
name: "cached time range is a disjoint of the requested time range",
requestedStart: 1675115596722,
requestedEnd: 1675115596722 + 180*60*1000,
cachedSeries: []*v3.Series{
{
Labels: map[string]string{
"__name__": "http_server_requests_seconds_count",
},
Points: []v3.Point{
{
Timestamp: 1675115596722 + 240*60*1000,
Value: 1,
},
{
Timestamp: 1675115596722 + 300*60*1000,
Value: 1,
},
{
Timestamp: 1675115596722 + 360*60*1000,
Value: 1,
},
},
},
},
expectedMiss: []missInterval{
{
start: 1675115596722,
end: 1675115596722 + 180*60*1000,
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
misses := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.cachedSeries, 0*time.Minute)
if len(misses) != len(tc.expectedMiss) {
t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses))
}
for i, miss := range misses {
if miss.start != tc.expectedMiss[i].start {
t.Errorf("expected start %d, got %d", tc.expectedMiss[i].start, miss.start)
}
if miss.end != tc.expectedMiss[i].end {
t.Errorf("expected end %d, got %d", tc.expectedMiss[i].end, miss.end)
}
}
})
}
}
func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) {
testCases := []struct {
name string
requestedStart int64
requestedEnd int64
cachedSeries []*v3.Series
fluxInterval time.Duration
expectedMiss []missInterval
}{
{
name: "cached time range is a subset of the requested time range",
requestedStart: 1675115596722,
requestedEnd: 1675115596722 + 180*60*1000,
cachedSeries: []*v3.Series{
{
Labels: map[string]string{
"__name__": "http_server_requests_seconds_count",
},
Points: []v3.Point{
{
Timestamp: 1675115596722 + 60*60*1000,
Value: 1,
},
{
Timestamp: 1675115596722 + 120*60*1000,
Value: 1,
},
},
},
},
fluxInterval: 5 * time.Minute,
expectedMiss: []missInterval{
{
start: 1675115596722,
end: 1675115596722 + 60*60*1000 - 1,
},
{
start: 1675115596722 + 120*60*1000 + 1,
end: 1675115596722 + 180*60*1000,
},
},
},
{
name: "cached time range is a superset of the requested time range",
requestedStart: 1675115596722,
requestedEnd: 1675115596722 + 180*60*1000,
cachedSeries: []*v3.Series{
{
Labels: map[string]string{
"__name__": "http_server_requests_seconds_count",
},
Points: []v3.Point{
{
Timestamp: 1675115596722,
Value: 1,
},
{
Timestamp: 1675115596722 + 60*60*1000,
Value: 1,
},
{
Timestamp: 1675115596722 + 120*60*1000,
Value: 1,
},
{
Timestamp: 1675115596722 + 180*60*1000,
Value: 1,
},
},
},
},
fluxInterval: 5 * time.Minute,
expectedMiss: []missInterval{},
},
{
name: "cache time range is a left overlap of the requested time range",
requestedStart: 1675115596722,
requestedEnd: 1675115596722 + 180*60*1000,
cachedSeries: []*v3.Series{
{
Labels: map[string]string{
"__name__": "http_server_requests_seconds_count",
},
Points: []v3.Point{
{
Timestamp: 1675115596722,
Value: 1,
},
{
Timestamp: 1675115596722 + 60*60*1000,
Value: 1,
},
{
Timestamp: 1675115596722 + 120*60*1000,
Value: 1,
},
},
},
},
fluxInterval: 5 * time.Minute,
expectedMiss: []missInterval{
{
start: 1675115596722 + 120*60*1000 + 1,
end: 1675115596722 + 180*60*1000,
},
},
},
{
name: "cache time range is a right overlap of the requested time range",
requestedStart: 1675115596722,
requestedEnd: 1675115596722 + 180*60*1000,
cachedSeries: []*v3.Series{
{
Labels: map[string]string{
"__name__": "http_server_requests_seconds_count",
},
Points: []v3.Point{
{
Timestamp: 1675115596722 + 60*60*1000,
Value: 1,
},
{
Timestamp: 1675115596722 + 120*60*1000,
Value: 1,
},
{
Timestamp: 1675115596722 + 180*60*1000,
Value: 1,
},
},
},
},
fluxInterval: 5 * time.Minute,
expectedMiss: []missInterval{
{
start: 1675115596722,
end: 1675115596722 + 60*60*1000 - 1,
},
},
},
{
name: "cache time range is a disjoint of the requested time range",
requestedStart: 1675115596722,
requestedEnd: 1675115596722 + 180*60*1000,
cachedSeries: []*v3.Series{
{
Labels: map[string]string{
"__name__": "http_server_requests_seconds_count",
},
Points: []v3.Point{
{
Timestamp: 1675115596722 + 240*60*1000,
Value: 1,
},
{
Timestamp: 1675115596722 + 300*60*1000,
Value: 1,
},
{
Timestamp: 1675115596722 + 360*60*1000,
Value: 1,
},
},
},
},
fluxInterval: 5 * time.Minute,
expectedMiss: []missInterval{
{
start: 1675115596722,
end: 1675115596722 + 180*60*1000,
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
misses := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.cachedSeries, tc.fluxInterval)
if len(misses) != len(tc.expectedMiss) {
t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses))
}
for i, miss := range misses {
if miss.start != tc.expectedMiss[i].start {
t.Errorf("expected start %d, got %d", tc.expectedMiss[i].start, miss.start)
}
if miss.end != tc.expectedMiss[i].end {
t.Errorf("expected end %d, got %d", tc.expectedMiss[i].end, miss.end)
}
}
})
}
}
func TestQueryRange(t *testing.T) {
params := []*v3.QueryRangeParamsV3{
{
Start: 1675115596722,
End: 1675115596722 + 120*60*1000,
Step: 5 * time.Minute.Microseconds(),
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
AggregateAttribute: v3.AttributeKey{Key: "http_server_requests_seconds_count", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true},
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{Key: "method", IsColumn: false},
Operator: "=",
Value: "GET",
},
},
},
GroupBy: []v3.AttributeKey{
{Key: "service_name", IsColumn: false},
{Key: "method", IsColumn: false},
},
AggregateOperator: v3.AggregateOperatorSumRate,
Expression: "A",
},
},
},
},
{
Start: 1675115596722 + 60*60*1000,
End: 1675115596722 + 180*60*1000,
Step: 5 * time.Minute.Microseconds(),
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
AggregateAttribute: v3.AttributeKey{Key: "http_server_requests_seconds_count", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true},
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{Key: "method", IsColumn: false},
Operator: "=",
Value: "GET",
},
},
},
GroupBy: []v3.AttributeKey{
{Key: "service_name", IsColumn: false},
{Key: "method", IsColumn: false},
},
AggregateOperator: v3.AggregateOperatorSumRate,
Expression: "A",
},
},
},
},
}
cache := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute})
opts := QuerierOptions{
Cache: cache,
Reader: nil,
FluxInterval: 5 * time.Minute,
KeyGenerator: queryBuilder.NewKeyGenerator(),
TestingMode: true,
ReturnedSeries: []*v3.Series{
{
Labels: map[string]string{
"method": "GET",
"service_name": "test",
"__name__": "http_server_requests_seconds_count",
},
Points: []v3.Point{
{Timestamp: 1675115596722, Value: 1},
{Timestamp: 1675115596722 + 60*60*1000, Value: 2},
{Timestamp: 1675115596722 + 120*60*1000, Value: 3},
},
},
},
}
q := NewQuerier(opts)
expectedTimeRangeInQueryString := []string{
fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms <= %d", 1675115596722, 1675115596722+120*60*1000),
fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms <= %d", 1675115596722+120*60*1000+1, 1675115596722+180*60*1000),
}
for i, param := range params {
_, err, errByName := q.QueryRange(context.Background(), param, nil, nil)
if err != nil {
t.Errorf("expected no error, got %s", err)
}
if len(errByName) > 0 {
t.Errorf("expected no error, got %v", errByName)
}
if !strings.Contains(q.QueriesExecuted()[i], expectedTimeRangeInQueryString[i]) {
t.Errorf("expected query to contain %s, got %s", expectedTimeRangeInQueryString[i], q.QueriesExecuted()[i])
}
}
}

View File

@ -5,6 +5,7 @@ import (
"strings"
"github.com/SigNoz/govaluate"
"go.signoz.io/signoz/pkg/query-service/cache"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.uber.org/zap"
)
@ -192,3 +193,82 @@ func (qb *QueryBuilder) PrepareQueries(params *v3.QueryRangeParamsV3, args ...in
}
return queries, nil
}
// cacheKeyGenerator implements the cache.KeyGenerator interface
type cacheKeyGenerator struct {
}
func expressionToKey(expression *govaluate.EvaluableExpression, keys map[string]string) string {
var modified []govaluate.ExpressionToken
tokens := expression.Tokens()
for idx := range tokens {
token := tokens[idx]
if token.Kind == govaluate.VARIABLE {
token.Value = keys[fmt.Sprintf("%s", token.Value)]
token.Meta = keys[fmt.Sprintf("%s", token.Meta)]
}
modified = append(modified, token)
}
// err should be nil here since the expression is already validated
formula, _ := govaluate.NewEvaluableExpressionFromTokens(modified)
return formula.ExpressionString()
}
func (c *cacheKeyGenerator) GenerateKeys(params *v3.QueryRangeParamsV3) map[string]string {
keys := make(map[string]string)
// Build keys for each builder query
for queryName, query := range params.CompositeQuery.BuilderQueries {
if query.Expression == queryName {
var parts []string
// We need to build uniqe cache query for BuilderQuery
parts = append(parts, fmt.Sprintf("source=%s", query.DataSource))
parts = append(parts, fmt.Sprintf("step=%d", query.StepInterval))
parts = append(parts, fmt.Sprintf("aggregate=%s", query.AggregateOperator))
if query.AggregateAttribute.Key != "" {
parts = append(parts, fmt.Sprintf("aggregateAttribute=%s", query.AggregateAttribute.CacheKey()))
}
if query.Filters != nil && len(query.Filters.Items) > 0 {
for idx, filter := range query.Filters.Items {
parts = append(parts, fmt.Sprintf("filter-%d=%s", idx, filter.CacheKey()))
}
}
if len(query.GroupBy) > 0 {
for idx, groupBy := range query.GroupBy {
parts = append(parts, fmt.Sprintf("groupBy-%d=%s", idx, groupBy.CacheKey()))
}
}
if len(query.Having) > 0 {
for idx, having := range query.Having {
parts = append(parts, fmt.Sprintf("having-%d=%s", idx, having.CacheKey()))
}
}
key := strings.Join(parts, "&")
keys[queryName] = key
}
}
// Build keys for each expression
for _, query := range params.CompositeQuery.BuilderQueries {
if query.Expression != query.QueryName {
expression, _ := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, EvalFuncs)
expressionCacheKey := expressionToKey(expression, keys)
keys[query.QueryName] = expressionCacheKey
}
}
return keys
}
func NewKeyGenerator() cache.KeyGenerator {
return &cacheKeyGenerator{}
}

View File

@ -94,3 +94,10 @@ type Reader interface {
QueryDashboardVars(ctx context.Context, query string) (*model.DashboardVar, error)
CheckClickHouse(ctx context.Context) error
}
type Querier interface {
QueryRange(context.Context, *v3.QueryRangeParamsV3, map[string]v3.AttributeKey, map[string]v3.AttributeKey) ([]*v3.Series, error, map[string]string)
// test helpers
QueriesExecuted() []string
}

View File

@ -3,6 +3,7 @@ package v3
import (
"encoding/json"
"fmt"
"sort"
"strconv"
"time"
@ -283,6 +284,10 @@ type AttributeKey struct {
IsColumn bool `json:"isColumn"`
}
func (a AttributeKey) CacheKey() string {
return fmt.Sprintf("%s-%s-%s-%t", a.Key, a.DataType, a.Type, a.IsColumn)
}
func (a AttributeKey) Validate() error {
switch a.DataType {
case AttributeKeyDataTypeBool, AttributeKeyDataTypeInt64, AttributeKeyDataTypeFloat64, AttributeKeyDataTypeString, AttributeKeyDataTypeUnspecified:
@ -319,6 +324,7 @@ type QueryRangeParamsV3 struct {
Step int64 `json:"step"`
CompositeQuery *CompositeQuery `json:"compositeQuery"`
Variables map[string]interface{} `json:"variables,omitempty"`
NoCache bool `json:"noCache"`
}
type PromQuery struct {
@ -534,6 +540,10 @@ type FilterItem struct {
Operator FilterOperator `json:"op"`
}
func (f *FilterItem) CacheKey() string {
return fmt.Sprintf("key:%s,op:%s,value:%v", f.Key.CacheKey(), f.Operator, f.Value)
}
type OrderBy struct {
ColumnName string `json:"columnName"`
Order string `json:"order"`
@ -545,6 +555,10 @@ type Having struct {
Value interface{} `json:"value"`
}
func (h *Having) CacheKey() string {
return fmt.Sprintf("column:%s,op:%s,value:%v", h.ColumnName, h.Operator, h.Value)
}
type QueryRangeResponse struct {
ResultType string `json:"resultType"`
Result []*Result `json:"result"`
@ -561,6 +575,12 @@ type Series struct {
Points []Point `json:"values"`
}
func (s *Series) SortPoints() {
sort.Slice(s.Points, func(i, j int) bool {
return s.Points[i].Timestamp < s.Points[j].Timestamp
})
}
type Row struct {
Timestamp time.Time `json:"timestamp"`
Data map[string]interface{} `json:"data"`
@ -577,6 +597,21 @@ func (p *Point) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]interface{}{"timestamp": p.Timestamp, "value": v})
}
// UnmarshalJSON implements json.Unmarshaler.
func (p *Point) UnmarshalJSON(data []byte) error {
var v struct {
Timestamp int64 `json:"timestamp"`
Value string `json:"value"`
}
if err := json.Unmarshal(data, &v); err != nil {
return err
}
p.Timestamp = v.Timestamp
var err error
p.Value, err = strconv.ParseFloat(v.Value, 64)
return err
}
// ExploreQuery is a query for the explore page
// It is a composite query with a source page name
// The source page name is used to identify the page that initiated the query