From 04a9de1e32c6403f7da57e3ddd83b557872a2c42 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Thu, 8 Jun 2023 15:46:18 +0530 Subject: [PATCH] feat: add querier interface and initial implementation (#2782) --- Makefile | 1 + .../app/metrics/v3/query_builder.go | 11 + pkg/query-service/app/querier/querier.go | 426 +++++++++++++++ pkg/query-service/app/querier/querier_test.go | 507 ++++++++++++++++++ .../app/queryBuilder/query_builder.go | 80 +++ pkg/query-service/interfaces/interface.go | 11 +- pkg/query-service/model/v3/v3.go | 35 ++ 7 files changed, 1069 insertions(+), 2 deletions(-) create mode 100644 pkg/query-service/app/querier/querier.go create mode 100644 pkg/query-service/app/querier/querier_test.go diff --git a/Makefile b/Makefile index c08b0c5f85..0c22db72d6 100644 --- a/Makefile +++ b/Makefile @@ -140,5 +140,6 @@ test: go test ./pkg/query-service/app/metrics/... go test ./pkg/query-service/cache/... go test ./pkg/query-service/app/... + go test ./pkg/query-service/app/querier/... go test ./pkg/query-service/converter/... go test ./pkg/query-service/formatter/... diff --git a/pkg/query-service/app/metrics/v3/query_builder.go b/pkg/query-service/app/metrics/v3/query_builder.go index 82995d9285..dc5aadb618 100644 --- a/pkg/query-service/app/metrics/v3/query_builder.go +++ b/pkg/query-service/app/metrics/v3/query_builder.go @@ -3,8 +3,10 @@ package v3 import ( "fmt" "strings" + "time" "go.signoz.io/signoz/pkg/query-service/constants" + "go.signoz.io/signoz/pkg/query-service/model" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/utils" ) @@ -403,3 +405,12 @@ func PrepareMetricQuery(start, end int64, queryType v3.QueryType, panelType v3.P } return query, err } + +func BuildPromQuery(promQuery *v3.PromQuery, step, start, end int64) *model.QueryRangeParams { + return &model.QueryRangeParams{ + Query: promQuery.Query, + Start: time.UnixMilli(start), + End: time.UnixMilli(end), + Step: time.Duration(step * int64(time.Second)), + } +} diff --git a/pkg/query-service/app/querier/querier.go b/pkg/query-service/app/querier/querier.go new file mode 100644 index 0000000000..3a47ccf3a8 --- /dev/null +++ b/pkg/query-service/app/querier/querier.go @@ -0,0 +1,426 @@ +package querier + +import ( + "context" + "encoding/json" + "fmt" + "math" + "sort" + "strings" + "time" + + logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" + metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" + tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" + + "go.signoz.io/signoz/pkg/query-service/cache" + "go.signoz.io/signoz/pkg/query-service/cache/status" + "go.signoz.io/signoz/pkg/query-service/interfaces" + "go.signoz.io/signoz/pkg/query-service/model" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.uber.org/zap" +) + +type missInterval struct { + start, end int64 // in milliseconds +} + +type querier struct { + cache cache.Cache + reader interfaces.Reader + keyGenerator cache.KeyGenerator + + fluxInterval time.Duration + + // used for testing + // TODO(srikanthccv): remove this once we have a proper mock + testingMode bool + queriesExecuted []string + returnedSeries []*v3.Series + returnedErr error +} + +type QuerierOptions struct { + Reader interfaces.Reader + Cache cache.Cache + KeyGenerator cache.KeyGenerator + FluxInterval time.Duration + + // used for testing + TestingMode bool + ReturnedSeries []*v3.Series + ReturnedErr error +} + +func NewQuerier(opts QuerierOptions) interfaces.Querier { + return &querier{ + cache: opts.Cache, + reader: opts.Reader, + keyGenerator: opts.KeyGenerator, + fluxInterval: opts.FluxInterval, + + testingMode: opts.TestingMode, + returnedSeries: opts.ReturnedSeries, + returnedErr: opts.ReturnedErr, + } +} + +func (q *querier) execClickHouseQuery(ctx context.Context, query string) ([]*v3.Series, error) { + q.queriesExecuted = append(q.queriesExecuted, query) + if q.testingMode && q.reader == nil { + return q.returnedSeries, q.returnedErr + } + return q.reader.GetTimeSeriesResultV3(ctx, query) +} + +func (q *querier) execPromQuery(ctx context.Context, params *model.QueryRangeParams) ([]*v3.Series, error) { + q.queriesExecuted = append(q.queriesExecuted, params.Query) + if q.testingMode && q.reader == nil { + return q.returnedSeries, q.returnedErr + } + promResult, _, err := q.reader.GetQueryRangeResult(ctx, params) + if err != nil { + return nil, err + } + matrix, promErr := promResult.Matrix() + if promErr != nil { + return nil, promErr + } + var seriesList []*v3.Series + for _, v := range matrix { + var s v3.Series + s.Labels = v.Metric.Copy().Map() + for idx := range v.Points { + p := v.Points[idx] + s.Points = append(s.Points, v3.Point{Timestamp: p.T, Value: p.V}) + } + seriesList = append(seriesList, &s) + } + return seriesList, nil +} + +// findMissingTimeRanges finds the missing time ranges in the seriesList +// and returns a list of miss structs, It takes the fluxInterval into +// account to find the missing time ranges. +// +// The [End - fluxInterval, End] is always added to the list of misses, because +// the data might still be in flux and not yet available in the database. +func findMissingTimeRanges(start, end int64, seriesList []*v3.Series, fluxInterval time.Duration) (misses []missInterval) { + var cachedStart, cachedEnd int64 + for idx := range seriesList { + series := seriesList[idx] + for pointIdx := range series.Points { + point := series.Points[pointIdx] + if cachedStart == 0 || point.Timestamp < cachedStart { + cachedStart = point.Timestamp + } + if cachedEnd == 0 || point.Timestamp > cachedEnd { + cachedEnd = point.Timestamp + } + } + } + + // Exclude the flux interval from the cached end time + cachedEnd = int64( + math.Min( + float64(cachedEnd), + float64(time.Now().UnixMilli()-fluxInterval.Milliseconds()), + ), + ) + + // There are five cases to consider + // 1. Cached time range is a subset of the requested time range + // 2. Cached time range is a superset of the requested time range + // 3. Cached time range is a left overlap of the requested time range + // 4. Cached time range is a right overlap of the requested time range + // 5. Cached time range is a disjoint of the requested time range + if cachedStart >= start && cachedEnd <= end { + // Case 1: Cached time range is a subset of the requested time range + // Add misses for the left and right sides of the cached time range + misses = append(misses, missInterval{start: start, end: cachedStart - 1}) + misses = append(misses, missInterval{start: cachedEnd + 1, end: end}) + } else if cachedStart <= start && cachedEnd >= end { + // Case 2: Cached time range is a superset of the requested time range + // No misses + } else if cachedStart <= start && cachedEnd >= start { + // Case 3: Cached time range is a left overlap of the requested time range + // Add a miss for the left side of the cached time range + misses = append(misses, missInterval{start: cachedEnd + 1, end: end}) + } else if cachedStart <= end && cachedEnd >= end { + // Case 4: Cached time range is a right overlap of the requested time range + // Add a miss for the right side of the cached time range + misses = append(misses, missInterval{start: start, end: cachedStart - 1}) + } else { + // Case 5: Cached time range is a disjoint of the requested time range + // Add a miss for the entire requested time range + misses = append(misses, missInterval{start: start, end: end}) + } + + // remove the struts with start > end + var validMisses []missInterval + for idx := range misses { + miss := misses[idx] + if miss.start <= miss.end { + validMisses = append(validMisses, miss) + } + } + return validMisses +} + +// findMissingTimeRanges finds the missing time ranges in the cached data +// and returns them as a list of misses +func (q *querier) findMissingTimeRanges(start, end, step int64, cachedData []byte) (misses []missInterval) { + var cachedSeriesList []*v3.Series + if err := json.Unmarshal(cachedData, &cachedSeriesList); err != nil { + // In case of error, we return the entire range as a miss + return []missInterval{{start: start, end: end}} + } + return findMissingTimeRanges(start, end, cachedSeriesList, q.fluxInterval) +} + +func labelsToString(labels map[string]string) string { + type label struct { + Key string + Value string + } + var labelsList []label + for k, v := range labels { + labelsList = append(labelsList, label{Key: k, Value: v}) + } + sort.Slice(labelsList, func(i, j int) bool { + return labelsList[i].Key < labelsList[j].Key + }) + labelKVs := make([]string, len(labelsList)) + for idx := range labelsList { + labelKVs[idx] = labelsList[idx].Key + "=" + labelsList[idx].Value + } + return fmt.Sprintf("{%s}", strings.Join(labelKVs, ",")) +} + +func mergeSerieses(cachedSeries, missedSeries []*v3.Series) []*v3.Series { + // Merge the missed series with the cached series by timestamp + mergedSeries := make([]*v3.Series, 0) + seriesesByLabels := make(map[string]*v3.Series) + for idx := range cachedSeries { + series := cachedSeries[idx] + seriesesByLabels[labelsToString(series.Labels)] = series + } + + for idx := range missedSeries { + series := missedSeries[idx] + if _, ok := seriesesByLabels[labelsToString(series.Labels)]; !ok { + seriesesByLabels[labelsToString(series.Labels)] = series + continue + } + seriesesByLabels[labelsToString(series.Labels)].Points = append(seriesesByLabels[labelsToString(series.Labels)].Points, series.Points...) + } + // Sort the points in each series by timestamp + for idx := range seriesesByLabels { + series := seriesesByLabels[idx] + series.SortPoints() + mergedSeries = append(mergedSeries, series) + } + return mergedSeries +} + +func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangeParamsV3, fields map[string]v3.AttributeKey, keys map[string]v3.AttributeKey) ([]*v3.Series, error, map[string]string) { + + cacheKeys := q.keyGenerator.GenerateKeys(params) + + seriesList := make([]*v3.Series, 0) + errQueriesByName := make(map[string]string) + var err error + + for queryName, builderQuery := range params.CompositeQuery.BuilderQueries { + + // TODO: add support for logs and traces + if builderQuery.DataSource == v3.DataSourceLogs { + query, err := logsV3.PrepareLogsQuery(params.Start, params.End, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, fields) + if err != nil { + errQueriesByName[queryName] = err.Error() + continue + } + series, err := q.execClickHouseQuery(ctx, query) + if err != nil { + errQueriesByName[queryName] = err.Error() + continue + } + seriesList = append(seriesList, series...) + continue + } + + if builderQuery.DataSource == v3.DataSourceTraces { + query, err := tracesV3.PrepareTracesQuery(params.Start, params.End, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, keys) + if err != nil { + errQueriesByName[queryName] = err.Error() + continue + } + + series, err := q.execClickHouseQuery(ctx, query) + if err != nil { + errQueriesByName[queryName] = err.Error() + continue + } + seriesList = append(seriesList, series...) + continue + } + + cacheKey := cacheKeys[queryName] + var cachedData []byte + if !params.NoCache { + var retrieveStatus status.RetrieveStatus + cachedData, retrieveStatus, err = q.cache.Retrieve(cacheKey, true) + zap.L().Debug("cache retrieve status", zap.String("status", retrieveStatus.String())) + if err != nil { + return nil, err, nil + } + } + misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData) + missedSeries := make([]*v3.Series, 0) + cachedSeries := make([]*v3.Series, 0) + for _, miss := range misses { + query, err := metricsV3.PrepareMetricQuery( + miss.start, + miss.end, + params.CompositeQuery.QueryType, + params.CompositeQuery.PanelType, + builderQuery, + ) + if err != nil { + errQueriesByName[queryName] = err.Error() + continue + } + series, err := q.execClickHouseQuery(ctx, query) + if err != nil { + errQueriesByName[queryName] = err.Error() + continue + } + missedSeries = append(missedSeries, series...) + } + if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { + errQueriesByName[queryName] = err.Error() + continue + } + mergedSeries := mergeSerieses(cachedSeries, missedSeries) + + seriesList = append(seriesList, mergedSeries...) + // Cache the seriesList for future queries + if len(missedSeries) > 0 { + mergedSeriesData, err := json.Marshal(mergedSeries) + if err != nil { + errQueriesByName[queryName] = err.Error() + continue + } + err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour) + if err != nil { + errQueriesByName[queryName] = err.Error() + continue + } + } + } + if len(errQueriesByName) > 0 { + err = fmt.Errorf("error in builder queries") + } + return seriesList, err, errQueriesByName +} + +func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Series, error, map[string]string) { + seriesList := make([]*v3.Series, 0) + errQueriesByName := make(map[string]string) + var err error + for queryName, promQuery := range params.CompositeQuery.PromQueries { + cacheKey := q.keyGenerator.GenerateKeys(params)[queryName] + var cachedData []byte + var retrieveStatus status.RetrieveStatus + if !params.NoCache { + cachedData, retrieveStatus, err = q.cache.Retrieve(cacheKey, true) + zap.L().Debug("cache retrieve status", zap.String("status", retrieveStatus.String())) + } + if err != nil { + errQueriesByName[queryName] = err.Error() + continue + } + misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData) + missedSeries := make([]*v3.Series, 0) + cachedSeries := make([]*v3.Series, 0) + for _, miss := range misses { + query := metricsV3.BuildPromQuery( + promQuery, + params.Step, + miss.start, + miss.end, + ) + series, err := q.execPromQuery(ctx, query) + if err != nil { + errQueriesByName[queryName] = err.Error() + continue + } + missedSeries = append(missedSeries, series...) + } + if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { + errQueriesByName[queryName] = err.Error() + continue + } + mergedSeries := mergeSerieses(cachedSeries, missedSeries) + + seriesList = append(seriesList, mergedSeries...) + // Cache the seriesList for future queries + if len(missedSeries) > 0 { + mergedSeriesData, err := json.Marshal(mergedSeries) + if err != nil { + errQueriesByName[queryName] = err.Error() + continue + } + err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour) + if err != nil { + errQueriesByName[queryName] = err.Error() + continue + } + } + } + if len(errQueriesByName) > 0 { + err = fmt.Errorf("error in prom queries") + } + return seriesList, err, errQueriesByName +} + +func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Series, error, map[string]string) { + seriesList := make([]*v3.Series, 0) + errQueriesByName := make(map[string]string) + var err error + for queryName, clickHouseQuery := range params.CompositeQuery.ClickHouseQueries { + series, err := q.execClickHouseQuery(ctx, clickHouseQuery.Query) + if err != nil { + errQueriesByName[queryName] = err.Error() + continue + } + seriesList = append(seriesList, series...) + } + if len(errQueriesByName) > 0 { + err = fmt.Errorf("error in clickhouse queries") + } + return seriesList, err, errQueriesByName +} + +func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3, fields map[string]v3.AttributeKey, keys map[string]v3.AttributeKey) ([]*v3.Series, error, map[string]string) { + var seriesList []*v3.Series + var err error + var errQueriesByName map[string]string + if params.CompositeQuery != nil { + switch params.CompositeQuery.QueryType { + case v3.QueryTypeBuilder: + seriesList, err, errQueriesByName = q.runBuilderQueries(ctx, params, fields, keys) + case v3.QueryTypePromQL: + seriesList, err, errQueriesByName = q.runPromQueries(ctx, params) + case v3.QueryTypeClickHouseSQL: + seriesList, err, errQueriesByName = q.runClickHouseQueries(ctx, params) + default: + err = fmt.Errorf("invalid query type") + } + } + return seriesList, err, errQueriesByName +} + +func (q *querier) QueriesExecuted() []string { + return q.queriesExecuted +} diff --git a/pkg/query-service/app/querier/querier_test.go b/pkg/query-service/app/querier/querier_test.go new file mode 100644 index 0000000000..a9f7d3d8ae --- /dev/null +++ b/pkg/query-service/app/querier/querier_test.go @@ -0,0 +1,507 @@ +package querier + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" + "go.signoz.io/signoz/pkg/query-service/cache/inmemory" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +func TestFindMissingTimeRangesZeroFreshNess(t *testing.T) { + // There are five scenarios: + // 1. Cached time range is a subset of the requested time range + // 2. Cached time range is a superset of the requested time range + // 3. Cached time range is a left overlap of the requested time range + // 4. Cached time range is a right overlap of the requested time range + // 5. Cached time range is a disjoint of the requested time range + testCases := []struct { + name string + requestedStart int64 // in milliseconds + requestedEnd int64 // in milliseconds + cachedSeries []*v3.Series + expectedMiss []missInterval + }{ + { + name: "cached time range is a subset of the requested time range", + requestedStart: 1675115596722, + requestedEnd: 1675115596722 + 180*60*1000, + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + { + Timestamp: 1675115596722 + 60*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 120*60*1000, + Value: 1, + }, + }, + }, + }, + expectedMiss: []missInterval{ + { + start: 1675115596722, + end: 1675115596722 + 60*60*1000 - 1, + }, + { + start: 1675115596722 + 120*60*1000 + 1, + end: 1675115596722 + 180*60*1000, + }, + }, + }, + { + name: "cached time range is a superset of the requested time range", + requestedStart: 1675115596722, + requestedEnd: 1675115596722 + 180*60*1000, + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + { + Timestamp: 1675115596722, + Value: 1, + }, + { + Timestamp: 1675115596722 + 60*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 120*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 180*60*1000, + Value: 1, + }, + }, + }, + }, + expectedMiss: []missInterval{}, + }, + { + name: "cached time range is a left overlap of the requested time range", + requestedStart: 1675115596722, + requestedEnd: 1675115596722 + 180*60*1000, + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + { + Timestamp: 1675115596722, + Value: 1, + }, + { + Timestamp: 1675115596722 + 60*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 120*60*1000, + Value: 1, + }, + }, + }, + }, + expectedMiss: []missInterval{ + { + start: 1675115596722 + 120*60*1000 + 1, + end: 1675115596722 + 180*60*1000, + }, + }, + }, + { + name: "cached time range is a right overlap of the requested time range", + requestedStart: 1675115596722, + requestedEnd: 1675115596722 + 180*60*1000, + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + { + Timestamp: 1675115596722 + 60*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 120*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 180*60*1000, + Value: 1, + }, + }, + }, + }, + expectedMiss: []missInterval{ + { + start: 1675115596722, + end: 1675115596722 + 60*60*1000 - 1, + }, + }, + }, + { + name: "cached time range is a disjoint of the requested time range", + requestedStart: 1675115596722, + requestedEnd: 1675115596722 + 180*60*1000, + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + { + Timestamp: 1675115596722 + 240*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 300*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 360*60*1000, + Value: 1, + }, + }, + }, + }, + expectedMiss: []missInterval{ + { + start: 1675115596722, + end: 1675115596722 + 180*60*1000, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + misses := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.cachedSeries, 0*time.Minute) + if len(misses) != len(tc.expectedMiss) { + t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses)) + } + for i, miss := range misses { + if miss.start != tc.expectedMiss[i].start { + t.Errorf("expected start %d, got %d", tc.expectedMiss[i].start, miss.start) + } + if miss.end != tc.expectedMiss[i].end { + t.Errorf("expected end %d, got %d", tc.expectedMiss[i].end, miss.end) + } + } + }) + } +} + +func TestFindMissingTimeRangesWithFluxInterval(t *testing.T) { + + testCases := []struct { + name string + requestedStart int64 + requestedEnd int64 + cachedSeries []*v3.Series + fluxInterval time.Duration + expectedMiss []missInterval + }{ + { + name: "cached time range is a subset of the requested time range", + requestedStart: 1675115596722, + requestedEnd: 1675115596722 + 180*60*1000, + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + { + Timestamp: 1675115596722 + 60*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 120*60*1000, + Value: 1, + }, + }, + }, + }, + fluxInterval: 5 * time.Minute, + expectedMiss: []missInterval{ + { + start: 1675115596722, + end: 1675115596722 + 60*60*1000 - 1, + }, + { + start: 1675115596722 + 120*60*1000 + 1, + end: 1675115596722 + 180*60*1000, + }, + }, + }, + { + name: "cached time range is a superset of the requested time range", + requestedStart: 1675115596722, + requestedEnd: 1675115596722 + 180*60*1000, + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + { + Timestamp: 1675115596722, + Value: 1, + }, + { + Timestamp: 1675115596722 + 60*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 120*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 180*60*1000, + Value: 1, + }, + }, + }, + }, + fluxInterval: 5 * time.Minute, + expectedMiss: []missInterval{}, + }, + { + name: "cache time range is a left overlap of the requested time range", + requestedStart: 1675115596722, + requestedEnd: 1675115596722 + 180*60*1000, + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + { + Timestamp: 1675115596722, + Value: 1, + }, + { + Timestamp: 1675115596722 + 60*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 120*60*1000, + Value: 1, + }, + }, + }, + }, + fluxInterval: 5 * time.Minute, + expectedMiss: []missInterval{ + { + start: 1675115596722 + 120*60*1000 + 1, + end: 1675115596722 + 180*60*1000, + }, + }, + }, + { + name: "cache time range is a right overlap of the requested time range", + requestedStart: 1675115596722, + requestedEnd: 1675115596722 + 180*60*1000, + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + { + Timestamp: 1675115596722 + 60*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 120*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 180*60*1000, + Value: 1, + }, + }, + }, + }, + fluxInterval: 5 * time.Minute, + expectedMiss: []missInterval{ + { + start: 1675115596722, + end: 1675115596722 + 60*60*1000 - 1, + }, + }, + }, + { + name: "cache time range is a disjoint of the requested time range", + requestedStart: 1675115596722, + requestedEnd: 1675115596722 + 180*60*1000, + cachedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + { + Timestamp: 1675115596722 + 240*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 300*60*1000, + Value: 1, + }, + { + Timestamp: 1675115596722 + 360*60*1000, + Value: 1, + }, + }, + }, + }, + fluxInterval: 5 * time.Minute, + expectedMiss: []missInterval{ + { + start: 1675115596722, + end: 1675115596722 + 180*60*1000, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + misses := findMissingTimeRanges(tc.requestedStart, tc.requestedEnd, tc.cachedSeries, tc.fluxInterval) + if len(misses) != len(tc.expectedMiss) { + t.Errorf("expected %d misses, got %d", len(tc.expectedMiss), len(misses)) + } + for i, miss := range misses { + if miss.start != tc.expectedMiss[i].start { + t.Errorf("expected start %d, got %d", tc.expectedMiss[i].start, miss.start) + } + if miss.end != tc.expectedMiss[i].end { + t.Errorf("expected end %d, got %d", tc.expectedMiss[i].end, miss.end) + } + } + }) + } +} + +func TestQueryRange(t *testing.T) { + params := []*v3.QueryRangeParamsV3{ + { + Start: 1675115596722, + End: 1675115596722 + 120*60*1000, + Step: 5 * time.Minute.Microseconds(), + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "http_server_requests_seconds_count", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true}, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "method", IsColumn: false}, + Operator: "=", + Value: "GET", + }, + }, + }, + GroupBy: []v3.AttributeKey{ + {Key: "service_name", IsColumn: false}, + {Key: "method", IsColumn: false}, + }, + AggregateOperator: v3.AggregateOperatorSumRate, + Expression: "A", + }, + }, + }, + }, + { + Start: 1675115596722 + 60*60*1000, + End: 1675115596722 + 180*60*1000, + Step: 5 * time.Minute.Microseconds(), + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "http_server_requests_seconds_count", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true}, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "method", IsColumn: false}, + Operator: "=", + Value: "GET", + }, + }, + }, + GroupBy: []v3.AttributeKey{ + {Key: "service_name", IsColumn: false}, + {Key: "method", IsColumn: false}, + }, + AggregateOperator: v3.AggregateOperatorSumRate, + Expression: "A", + }, + }, + }, + }, + } + cache := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute}) + opts := QuerierOptions{ + Cache: cache, + Reader: nil, + FluxInterval: 5 * time.Minute, + KeyGenerator: queryBuilder.NewKeyGenerator(), + + TestingMode: true, + ReturnedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "method": "GET", + "service_name": "test", + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596722, Value: 1}, + {Timestamp: 1675115596722 + 60*60*1000, Value: 2}, + {Timestamp: 1675115596722 + 120*60*1000, Value: 3}, + }, + }, + }, + } + q := NewQuerier(opts) + expectedTimeRangeInQueryString := []string{ + fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms <= %d", 1675115596722, 1675115596722+120*60*1000), + fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms <= %d", 1675115596722+120*60*1000+1, 1675115596722+180*60*1000), + } + + for i, param := range params { + _, err, errByName := q.QueryRange(context.Background(), param, nil, nil) + if err != nil { + t.Errorf("expected no error, got %s", err) + } + if len(errByName) > 0 { + t.Errorf("expected no error, got %v", errByName) + } + + if !strings.Contains(q.QueriesExecuted()[i], expectedTimeRangeInQueryString[i]) { + t.Errorf("expected query to contain %s, got %s", expectedTimeRangeInQueryString[i], q.QueriesExecuted()[i]) + } + } +} diff --git a/pkg/query-service/app/queryBuilder/query_builder.go b/pkg/query-service/app/queryBuilder/query_builder.go index 173a139aa9..ee7d7c8ef0 100644 --- a/pkg/query-service/app/queryBuilder/query_builder.go +++ b/pkg/query-service/app/queryBuilder/query_builder.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/SigNoz/govaluate" + "go.signoz.io/signoz/pkg/query-service/cache" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.uber.org/zap" ) @@ -192,3 +193,82 @@ func (qb *QueryBuilder) PrepareQueries(params *v3.QueryRangeParamsV3, args ...in } return queries, nil } + +// cacheKeyGenerator implements the cache.KeyGenerator interface +type cacheKeyGenerator struct { +} + +func expressionToKey(expression *govaluate.EvaluableExpression, keys map[string]string) string { + + var modified []govaluate.ExpressionToken + tokens := expression.Tokens() + for idx := range tokens { + token := tokens[idx] + if token.Kind == govaluate.VARIABLE { + token.Value = keys[fmt.Sprintf("%s", token.Value)] + token.Meta = keys[fmt.Sprintf("%s", token.Meta)] + } + modified = append(modified, token) + } + // err should be nil here since the expression is already validated + formula, _ := govaluate.NewEvaluableExpressionFromTokens(modified) + return formula.ExpressionString() +} + +func (c *cacheKeyGenerator) GenerateKeys(params *v3.QueryRangeParamsV3) map[string]string { + keys := make(map[string]string) + + // Build keys for each builder query + for queryName, query := range params.CompositeQuery.BuilderQueries { + if query.Expression == queryName { + var parts []string + + // We need to build uniqe cache query for BuilderQuery + + parts = append(parts, fmt.Sprintf("source=%s", query.DataSource)) + parts = append(parts, fmt.Sprintf("step=%d", query.StepInterval)) + parts = append(parts, fmt.Sprintf("aggregate=%s", query.AggregateOperator)) + + if query.AggregateAttribute.Key != "" { + parts = append(parts, fmt.Sprintf("aggregateAttribute=%s", query.AggregateAttribute.CacheKey())) + } + + if query.Filters != nil && len(query.Filters.Items) > 0 { + for idx, filter := range query.Filters.Items { + parts = append(parts, fmt.Sprintf("filter-%d=%s", idx, filter.CacheKey())) + } + } + + if len(query.GroupBy) > 0 { + for idx, groupBy := range query.GroupBy { + parts = append(parts, fmt.Sprintf("groupBy-%d=%s", idx, groupBy.CacheKey())) + } + } + + if len(query.Having) > 0 { + for idx, having := range query.Having { + parts = append(parts, fmt.Sprintf("having-%d=%s", idx, having.CacheKey())) + } + } + + key := strings.Join(parts, "&") + keys[queryName] = key + } + } + + // Build keys for each expression + for _, query := range params.CompositeQuery.BuilderQueries { + if query.Expression != query.QueryName { + expression, _ := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, EvalFuncs) + + expressionCacheKey := expressionToKey(expression, keys) + keys[query.QueryName] = expressionCacheKey + } + } + + return keys +} + +func NewKeyGenerator() cache.KeyGenerator { + return &cacheKeyGenerator{} +} diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index cdc1f3387d..c6e0d7e25f 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -36,9 +36,9 @@ type Reader interface { GetDisks(ctx context.Context) (*[]model.DiskItem, *model.ApiError) GetSpanFilters(ctx context.Context, query *model.SpanFilterParams) (*model.SpanFiltersResponse, *model.ApiError) GetTraceAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) - GetTraceAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) + GetTraceAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) GetTraceAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) - GetSpanAttributeKeys(ctx context.Context) (map[string]v3.AttributeKey, error) + GetSpanAttributeKeys(ctx context.Context) (map[string]v3.AttributeKey, error) GetTagFilters(ctx context.Context, query *model.TagFilterParams) (*model.TagFilters, *model.ApiError) GetTagValues(ctx context.Context, query *model.TagFilterParams) (*model.TagValues, *model.ApiError) GetFilteredSpans(ctx context.Context, query *model.GetFilteredSpansParams) (*model.GetFilterSpansResponse, *model.ApiError) @@ -94,3 +94,10 @@ type Reader interface { QueryDashboardVars(ctx context.Context, query string) (*model.DashboardVar, error) CheckClickHouse(ctx context.Context) error } + +type Querier interface { + QueryRange(context.Context, *v3.QueryRangeParamsV3, map[string]v3.AttributeKey, map[string]v3.AttributeKey) ([]*v3.Series, error, map[string]string) + + // test helpers + QueriesExecuted() []string +} diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index d2f8f45fa5..a29a6cd93f 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -3,6 +3,7 @@ package v3 import ( "encoding/json" "fmt" + "sort" "strconv" "time" @@ -283,6 +284,10 @@ type AttributeKey struct { IsColumn bool `json:"isColumn"` } +func (a AttributeKey) CacheKey() string { + return fmt.Sprintf("%s-%s-%s-%t", a.Key, a.DataType, a.Type, a.IsColumn) +} + func (a AttributeKey) Validate() error { switch a.DataType { case AttributeKeyDataTypeBool, AttributeKeyDataTypeInt64, AttributeKeyDataTypeFloat64, AttributeKeyDataTypeString, AttributeKeyDataTypeUnspecified: @@ -319,6 +324,7 @@ type QueryRangeParamsV3 struct { Step int64 `json:"step"` CompositeQuery *CompositeQuery `json:"compositeQuery"` Variables map[string]interface{} `json:"variables,omitempty"` + NoCache bool `json:"noCache"` } type PromQuery struct { @@ -534,6 +540,10 @@ type FilterItem struct { Operator FilterOperator `json:"op"` } +func (f *FilterItem) CacheKey() string { + return fmt.Sprintf("key:%s,op:%s,value:%v", f.Key.CacheKey(), f.Operator, f.Value) +} + type OrderBy struct { ColumnName string `json:"columnName"` Order string `json:"order"` @@ -545,6 +555,10 @@ type Having struct { Value interface{} `json:"value"` } +func (h *Having) CacheKey() string { + return fmt.Sprintf("column:%s,op:%s,value:%v", h.ColumnName, h.Operator, h.Value) +} + type QueryRangeResponse struct { ResultType string `json:"resultType"` Result []*Result `json:"result"` @@ -561,6 +575,12 @@ type Series struct { Points []Point `json:"values"` } +func (s *Series) SortPoints() { + sort.Slice(s.Points, func(i, j int) bool { + return s.Points[i].Timestamp < s.Points[j].Timestamp + }) +} + type Row struct { Timestamp time.Time `json:"timestamp"` Data map[string]interface{} `json:"data"` @@ -577,6 +597,21 @@ func (p *Point) MarshalJSON() ([]byte, error) { return json.Marshal(map[string]interface{}{"timestamp": p.Timestamp, "value": v}) } +// UnmarshalJSON implements json.Unmarshaler. +func (p *Point) UnmarshalJSON(data []byte) error { + var v struct { + Timestamp int64 `json:"timestamp"` + Value string `json:"value"` + } + if err := json.Unmarshal(data, &v); err != nil { + return err + } + p.Timestamp = v.Timestamp + var err error + p.Value, err = strconv.ParseFloat(v.Value, 64) + return err +} + // ExploreQuery is a query for the explore page // It is a composite query with a source page name // The source page name is used to identify the page that initiated the query