Query range caching (#2142)

This commit is contained in:
Srikanth Chekuri 2023-09-17 10:40:45 +05:30 committed by GitHub
parent 4227faa6b5
commit 416a058eab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 848 additions and 207 deletions

View File

@ -19,7 +19,7 @@ LOCAL_GOOS ?= $(shell go env GOOS)
LOCAL_GOARCH ?= $(shell go env GOARCH) LOCAL_GOARCH ?= $(shell go env GOARCH)
REPONAME ?= signoz REPONAME ?= signoz
DOCKER_TAG ?= latest DOCKER_TAG ?= $(subst v,,$(BUILD_VERSION))
FRONTEND_DOCKER_IMAGE ?= frontend FRONTEND_DOCKER_IMAGE ?= frontend
QUERY_SERVICE_DOCKER_IMAGE ?= query-service QUERY_SERVICE_DOCKER_IMAGE ?= query-service

View File

@ -10,6 +10,7 @@ import (
"go.signoz.io/signoz/ee/query-service/license" "go.signoz.io/signoz/ee/query-service/license"
baseapp "go.signoz.io/signoz/pkg/query-service/app" baseapp "go.signoz.io/signoz/pkg/query-service/app"
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
"go.signoz.io/signoz/pkg/query-service/cache"
baseint "go.signoz.io/signoz/pkg/query-service/interfaces" baseint "go.signoz.io/signoz/pkg/query-service/interfaces"
basemodel "go.signoz.io/signoz/pkg/query-service/model" basemodel "go.signoz.io/signoz/pkg/query-service/model"
rules "go.signoz.io/signoz/pkg/query-service/rules" rules "go.signoz.io/signoz/pkg/query-service/rules"
@ -29,6 +30,9 @@ type APIHandlerOptions struct {
FeatureFlags baseint.FeatureLookup FeatureFlags baseint.FeatureLookup
LicenseManager *license.Manager LicenseManager *license.Manager
LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController
Cache cache.Cache
// Querier Influx Interval
FluxInterval time.Duration
} }
type APIHandler struct { type APIHandler struct {
@ -51,6 +55,8 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
RuleManager: opts.RulesManager, RuleManager: opts.RulesManager,
FeatureFlags: opts.FeatureFlags, FeatureFlags: opts.FeatureFlags,
LogsParsingPipelineController: opts.LogsParsingPipelineController, LogsParsingPipelineController: opts.LogsParsingPipelineController,
Cache: opts.Cache,
FluxInterval: opts.FluxInterval,
}) })
if err != nil { if err != nil {

View File

@ -35,6 +35,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/opamp" "go.signoz.io/signoz/pkg/query-service/app/opamp"
opAmpModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model" opAmpModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model"
baseauth "go.signoz.io/signoz/pkg/query-service/auth" baseauth "go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/cache"
baseconst "go.signoz.io/signoz/pkg/query-service/constants" baseconst "go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/healthcheck" "go.signoz.io/signoz/pkg/query-service/healthcheck"
basealm "go.signoz.io/signoz/pkg/query-service/integrations/alertManager" basealm "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
@ -62,6 +63,8 @@ type ServerOptions struct {
MaxIdleConns int MaxIdleConns int
MaxOpenConns int MaxOpenConns int
DialTimeout time.Duration DialTimeout time.Duration
CacheConfigPath string
FluxInterval string
} }
// Server runs HTTP api service // Server runs HTTP api service
@ -189,6 +192,21 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
telemetry.GetInstance().SetReader(reader) telemetry.GetInstance().SetReader(reader)
var c cache.Cache
if serverOptions.CacheConfigPath != "" {
cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath)
if err != nil {
return nil, err
}
c = cache.NewCache(cacheOpts)
}
fluxInterval, err := time.ParseDuration(serverOptions.FluxInterval)
if err != nil {
return nil, err
}
apiOpts := api.APIHandlerOptions{ apiOpts := api.APIHandlerOptions{
DataConnector: reader, DataConnector: reader,
SkipConfig: skipConfig, SkipConfig: skipConfig,
@ -202,6 +220,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
FeatureFlags: lm, FeatureFlags: lm,
LicenseManager: lm, LicenseManager: lm,
LogsParsingPipelineController: logParsingPipelineController, LogsParsingPipelineController: logParsingPipelineController,
Cache: c,
FluxInterval: fluxInterval,
} }
apiHandler, err := api.NewAPIHandler(apiOpts) apiHandler, err := api.NewAPIHandler(apiOpts)

View File

@ -82,6 +82,7 @@ func main() {
// the url used to build link in the alert messages in slack and other systems // the url used to build link in the alert messages in slack and other systems
var ruleRepoURL string var ruleRepoURL string
var cacheConfigPath, fluxInterval string
var enableQueryServiceLogOTLPExport bool var enableQueryServiceLogOTLPExport bool
var preferDelta bool var preferDelta bool
var preferSpanMetrics bool var preferSpanMetrics bool
@ -99,10 +100,14 @@ func main() {
flag.IntVar(&maxOpenConns, "max-open-conns", 100, "(max connections for use at any time.)") flag.IntVar(&maxOpenConns, "max-open-conns", 100, "(max connections for use at any time.)")
flag.DurationVar(&dialTimeout, "dial-timeout", 5*time.Second, "(the maximum time to establish a connection.)") flag.DurationVar(&dialTimeout, "dial-timeout", 5*time.Second, "(the maximum time to establish a connection.)")
flag.StringVar(&ruleRepoURL, "rules.repo-url", baseconst.AlertHelpPage, "(host address used to build rule link in alert messages)") flag.StringVar(&ruleRepoURL, "rules.repo-url", baseconst.AlertHelpPage, "(host address used to build rule link in alert messages)")
flag.StringVar(&cacheConfigPath, "experimental.cache-config", "", "(cache config to use)")
flag.StringVar(&fluxInterval, "flux-interval", "5m", "(cache config to use)")
flag.BoolVar(&enableQueryServiceLogOTLPExport, "enable.query.service.log.otlp.export", false, "(enable query service log otlp export)") flag.BoolVar(&enableQueryServiceLogOTLPExport, "enable.query.service.log.otlp.export", false, "(enable query service log otlp export)")
flag.Parse() flag.Parse()
loggerMgr := initZapLog(enableQueryServiceLogOTLPExport) loggerMgr := initZapLog(enableQueryServiceLogOTLPExport)
zap.ReplaceGlobals(loggerMgr) zap.ReplaceGlobals(loggerMgr)
defer loggerMgr.Sync() // flushes buffer, if any defer loggerMgr.Sync() // flushes buffer, if any
@ -121,6 +126,8 @@ func main() {
MaxIdleConns: maxIdleConns, MaxIdleConns: maxIdleConns,
MaxOpenConns: maxOpenConns, MaxOpenConns: maxOpenConns,
DialTimeout: dialTimeout, DialTimeout: dialTimeout,
CacheConfigPath: cacheConfigPath,
FluxInterval: fluxInterval,
} }
// Read the jwt secret key // Read the jwt secret key

View File

@ -28,9 +28,11 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/metrics" "go.signoz.io/signoz/pkg/query-service/app/metrics"
metricsv3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" metricsv3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
"go.signoz.io/signoz/pkg/query-service/app/parser" "go.signoz.io/signoz/pkg/query-service/app/parser"
"go.signoz.io/signoz/pkg/query-service/app/querier"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
"go.signoz.io/signoz/pkg/query-service/auth" "go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/cache"
"go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
querytemplate "go.signoz.io/signoz/pkg/query-service/utils/queryTemplate" querytemplate "go.signoz.io/signoz/pkg/query-service/utils/queryTemplate"
@ -53,6 +55,7 @@ type status string
const ( const (
statusSuccess status = "success" statusSuccess status = "success"
statusError status = "error" statusError status = "error"
defaultFluxInterval = 5 * time.Minute
) )
// NewRouter creates and configures a Gorilla Router. // NewRouter creates and configures a Gorilla Router.
@ -73,6 +76,7 @@ type APIHandler struct {
ruleManager *rules.Manager ruleManager *rules.Manager
featureFlags interfaces.FeatureLookup featureFlags interfaces.FeatureLookup
ready func(http.HandlerFunc) http.HandlerFunc ready func(http.HandlerFunc) http.HandlerFunc
querier interfaces.Querier
queryBuilder *queryBuilder.QueryBuilder queryBuilder *queryBuilder.QueryBuilder
preferDelta bool preferDelta bool
preferSpanMetrics bool preferSpanMetrics bool
@ -114,6 +118,11 @@ type APIHandlerOpts struct {
// Log parsing pipelines // Log parsing pipelines
LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController
// cache
Cache cache.Cache
// Querier Influx Interval
FluxInterval time.Duration
} }
// NewAPIHandler returns an APIHandler // NewAPIHandler returns an APIHandler
@ -124,6 +133,16 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
return nil, err return nil, err
} }
querierOpts := querier.QuerierOptions{
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags,
}
querier := querier.NewQuerier(querierOpts)
aH := &APIHandler{ aH := &APIHandler{
reader: opts.Reader, reader: opts.Reader,
appDao: opts.AppDao, appDao: opts.AppDao,
@ -137,6 +156,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
ruleManager: opts.RuleManager, ruleManager: opts.RuleManager,
featureFlags: opts.FeatureFlags, featureFlags: opts.FeatureFlags,
LogsParsingPipelineController: opts.LogsParsingPipelineController, LogsParsingPipelineController: opts.LogsParsingPipelineController,
querier: querier,
} }
builderOpts := queryBuilder.QueryBuilderOptions{ builderOpts := queryBuilder.QueryBuilderOptions{
@ -2965,9 +2985,8 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que
var result []*v3.Result var result []*v3.Result
var err error var err error
var errQuriesByName map[string]string var errQuriesByName map[string]string
var queries map[string]string var spanKeys map[string]v3.AttributeKey
switch queryRangeParams.CompositeQuery.QueryType { if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder {
case v3.QueryTypeBuilder:
// check if any enrichment is required for logs if yes then enrich them // check if any enrichment is required for logs if yes then enrich them
if logsv3.EnrichmentRequired(queryRangeParams) { if logsv3.EnrichmentRequired(queryRangeParams) {
// get the fields if any logs query is present // get the fields if any logs query is present
@ -2981,41 +3000,15 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que
logsv3.Enrich(queryRangeParams, fields) logsv3.Enrich(queryRangeParams, fields)
} }
var spanKeys map[string]v3.AttributeKey
spanKeys, err = aH.getSpanKeysV3(ctx, queryRangeParams) spanKeys, err = aH.getSpanKeysV3(ctx, queryRangeParams)
if err != nil { if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorInternal, Err: err} apiErrObj := &model.ApiError{Typ: model.ErrorInternal, Err: err}
RespondError(w, apiErrObj, errQuriesByName) RespondError(w, apiErrObj, errQuriesByName)
return return
} }
queries, err = aH.queryBuilder.PrepareQueries(queryRangeParams, spanKeys)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
} }
if queryRangeParams.CompositeQuery.PanelType == v3.PanelTypeList || queryRangeParams.CompositeQuery.PanelType == v3.PanelTypeTrace { result, err, errQuriesByName = aH.querier.QueryRange(ctx, queryRangeParams, spanKeys)
result, err, errQuriesByName = aH.execClickHouseListQueries(r.Context(), queries)
} else {
result, err, errQuriesByName = aH.execClickHouseGraphQueries(r.Context(), queries)
}
case v3.QueryTypeClickHouseSQL:
queries := make(map[string]string)
for name, query := range queryRangeParams.CompositeQuery.ClickHouseQueries {
if query.Disabled {
continue
}
queries[name] = query.Query
}
result, err, errQuriesByName = aH.execClickHouseGraphQueries(r.Context(), queries)
case v3.QueryTypePromQL:
result, err, errQuriesByName = aH.execPromQueries(r.Context(), queryRangeParams)
default:
err = fmt.Errorf("invalid query type")
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, errQuriesByName)
return
}
if err != nil { if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}

View File

@ -82,6 +82,7 @@ func TestBuildQueryWithMultipleQueries(t *testing.T) {
}, },
"B": { "B": {
QueryName: "B", QueryName: "B",
StepInterval: 60,
AggregateAttribute: v3.AttributeKey{Key: "name2"}, AggregateAttribute: v3.AttributeKey{Key: "name2"},
AggregateOperator: v3.AggregateOperatorRateMax, AggregateOperator: v3.AggregateOperatorRateMax,
Expression: "B", Expression: "B",

View File

@ -844,7 +844,6 @@ func parseFilterAttributeKeyRequest(r *http.Request) (*v3.FilterAttributeKeyRequ
dataSource := v3.DataSource(r.URL.Query().Get("dataSource")) dataSource := v3.DataSource(r.URL.Query().Get("dataSource"))
aggregateOperator := v3.AggregateOperator(r.URL.Query().Get("aggregateOperator")) aggregateOperator := v3.AggregateOperator(r.URL.Query().Get("aggregateOperator"))
aggregateAttribute := r.URL.Query().Get("aggregateAttribute") aggregateAttribute := r.URL.Query().Get("aggregateAttribute")
limit, err := strconv.Atoi(r.URL.Query().Get("limit")) limit, err := strconv.Atoi(r.URL.Query().Get("limit"))
if err != nil { if err != nil {
limit = 50 limit = 50

View File

@ -0,0 +1,304 @@
package querier
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3"
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
"go.signoz.io/signoz/pkg/query-service/cache/status"
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.uber.org/zap"
)
func (q *querier) runBuilderQuery(
ctx context.Context,
builderQuery *v3.BuilderQuery,
params *v3.QueryRangeParamsV3,
keys map[string]v3.AttributeKey,
cacheKeys map[string]string,
ch chan channelResult,
wg *sync.WaitGroup,
) {
defer wg.Done()
queryName := builderQuery.QueryName
var preferRPM bool
if q.featureLookUp != nil {
preferRPM = q.featureLookUp.CheckFeature(constants.PreferRPM) == nil
}
// TODO: handle other data sources
if builderQuery.DataSource == v3.DataSourceLogs {
var query string
var err error
// for ts query with limit replace it as it is already formed
if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 {
limitQuery, err := logsV3.PrepareLogsQuery(
params.Start,
params.End,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
logsV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: preferRPM},
)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil}
return
}
placeholderQuery, err := logsV3.PrepareLogsQuery(
params.Start,
params.End,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
logsV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: preferRPM},
)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: placeholderQuery, Series: nil}
return
}
query = fmt.Sprintf(placeholderQuery, limitQuery)
} else {
query, err = logsV3.PrepareLogsQuery(
params.Start,
params.End,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
logsV3.Options{PreferRPM: preferRPM},
)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return
}
}
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return
}
series, err := q.execClickHouseQuery(ctx, query)
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series}
return
}
if builderQuery.DataSource == v3.DataSourceTraces {
var query string
var err error
// for ts query with group by and limit form two queries
if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 {
limitQuery, err := tracesV3.PrepareTracesQuery(
params.Start,
params.End,
params.CompositeQuery.PanelType,
builderQuery,
keys,
tracesV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: preferRPM},
)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil}
return
}
placeholderQuery, err := tracesV3.PrepareTracesQuery(
params.Start,
params.End,
params.CompositeQuery.PanelType,
builderQuery,
keys,
tracesV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: preferRPM},
)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil}
return
}
query = fmt.Sprintf(placeholderQuery, limitQuery)
} else {
query, err = tracesV3.PrepareTracesQuery(
params.Start,
params.End,
params.CompositeQuery.PanelType,
builderQuery,
keys,
tracesV3.Options{PreferRPM: preferRPM},
)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return
}
}
series, err := q.execClickHouseQuery(ctx, query)
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series}
return
}
// What is happening here?
// We are only caching the graph panel queries. A non-existant cache key means that the query is not cached.
// If the query is not cached, we execute the query and return the result without caching it.
if _, ok := cacheKeys[queryName]; !ok {
query, err := metricsV3.PrepareMetricQuery(params.Start, params.End, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, metricsV3.Options{PreferRPM: preferRPM})
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return
}
series, err := q.execClickHouseQuery(ctx, query)
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series}
return
}
cacheKey := cacheKeys[queryName]
var cachedData []byte
if !params.NoCache && q.cache != nil {
var retrieveStatus status.RetrieveStatus
data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true)
zap.S().Debug("cache retrieve status", zap.String("status", retrieveStatus.String()))
if err == nil {
cachedData = data
}
}
misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {
query, err := metricsV3.PrepareMetricQuery(
miss.start,
miss.end,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
metricsV3.Options{},
)
if err != nil {
ch <- channelResult{
Err: err,
Name: queryName,
Query: query,
Series: nil,
}
return
}
series, err := q.execClickHouseQuery(ctx, query)
if err != nil {
ch <- channelResult{
Err: err,
Name: queryName,
Query: query,
Series: nil,
}
return
}
missedSeries = append(missedSeries, series...)
}
if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil {
zap.S().Error("error unmarshalling cached data", zap.Error(err))
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
ch <- channelResult{
Err: nil,
Name: queryName,
Series: mergedSeries,
}
// Cache the seriesList for future queries
if len(missedSeries) > 0 && !params.NoCache && q.cache != nil {
mergedSeriesData, err := json.Marshal(mergedSeries)
if err != nil {
zap.S().Error("error marshalling merged series", zap.Error(err))
return
}
err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
if err != nil {
zap.S().Error("error storing merged series", zap.Error(err))
return
}
}
}
func (q *querier) runBuilderExpression(
ctx context.Context,
builderQuery *v3.BuilderQuery,
params *v3.QueryRangeParamsV3,
keys map[string]v3.AttributeKey,
cacheKeys map[string]string,
ch chan channelResult,
wg *sync.WaitGroup,
) {
defer wg.Done()
queryName := builderQuery.QueryName
queries, err := q.builder.PrepareQueries(params, keys)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: "", Series: nil}
return
}
if _, ok := cacheKeys[queryName]; !ok {
query := queries[queryName]
series, err := q.execClickHouseQuery(ctx, query)
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series}
return
}
cacheKey := cacheKeys[queryName]
var cachedData []byte
if !params.NoCache && q.cache != nil {
var retrieveStatus status.RetrieveStatus
data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true)
zap.S().Debug("cache retrieve status", zap.String("status", retrieveStatus.String()))
if err == nil {
cachedData = data
}
}
misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {
missQueries, _ := q.builder.PrepareQueries(&v3.QueryRangeParamsV3{
Start: miss.start,
End: miss.end,
Step: params.Step,
NoCache: params.NoCache,
CompositeQuery: params.CompositeQuery,
Variables: params.Variables,
}, keys)
query := missQueries[queryName]
series, err := q.execClickHouseQuery(ctx, query)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return
}
missedSeries = append(missedSeries, series...)
}
if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil {
zap.S().Error("error unmarshalling cached data", zap.Error(err))
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
ch <- channelResult{
Err: nil,
Name: queryName,
Series: mergedSeries,
}
// Cache the seriesList for future queries
if len(missedSeries) > 0 && !params.NoCache && q.cache != nil {
mergedSeriesData, err := json.Marshal(mergedSeries)
if err != nil {
zap.S().Error("error marshalling merged series", zap.Error(err))
return
}
err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
if err != nil {
zap.S().Error("error storing merged series", zap.Error(err))
return
}
}
}

View File

@ -7,20 +7,30 @@ import (
"math" "math"
"sort" "sort"
"strings" "strings"
"sync"
"time" "time"
logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3"
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
"go.signoz.io/signoz/pkg/query-service/cache" "go.signoz.io/signoz/pkg/query-service/cache"
"go.signoz.io/signoz/pkg/query-service/cache/status"
"go.signoz.io/signoz/pkg/query-service/interfaces" "go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model" "go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.uber.org/multierr"
"go.uber.org/zap" "go.uber.org/zap"
) )
type channelResult struct {
Series []*v3.Series
List []*v3.Row
Err error
Name string
Query string
}
type missInterval struct { type missInterval struct {
start, end int64 // in milliseconds start, end int64 // in milliseconds
} }
@ -32,6 +42,9 @@ type querier struct {
fluxInterval time.Duration fluxInterval time.Duration
builder *queryBuilder.QueryBuilder
featureLookUp interfaces.FeatureLookup
// used for testing // used for testing
// TODO(srikanthccv): remove this once we have a proper mock // TODO(srikanthccv): remove this once we have a proper mock
testingMode bool testingMode bool
@ -45,6 +58,7 @@ type QuerierOptions struct {
Cache cache.Cache Cache cache.Cache
KeyGenerator cache.KeyGenerator KeyGenerator cache.KeyGenerator
FluxInterval time.Duration FluxInterval time.Duration
FeatureLookup interfaces.FeatureLookup
// used for testing // used for testing
TestingMode bool TestingMode bool
@ -59,6 +73,13 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
keyGenerator: opts.KeyGenerator, keyGenerator: opts.KeyGenerator,
fluxInterval: opts.FluxInterval, fluxInterval: opts.FluxInterval,
builder: queryBuilder.NewQueryBuilder(queryBuilder.QueryBuilderOptions{
BuildTraceQuery: tracesV3.PrepareTracesQuery,
BuildLogQuery: logsV3.PrepareLogsQuery,
BuildMetricQuery: metricsV3.PrepareMetricQuery,
}, opts.FeatureLookup),
featureLookUp: opts.FeatureLookup,
testingMode: opts.TestingMode, testingMode: opts.TestingMode,
returnedSeries: opts.ReturnedSeries, returnedSeries: opts.ReturnedSeries,
returnedErr: opts.ReturnedErr, returnedErr: opts.ReturnedErr,
@ -160,7 +181,7 @@ func findMissingTimeRanges(start, end int64, seriesList []*v3.Series, fluxInterv
var validMisses []missInterval var validMisses []missInterval
for idx := range misses { for idx := range misses {
miss := misses[idx] miss := misses[idx]
if miss.start <= miss.end { if miss.start < miss.end {
validMisses = append(validMisses, miss) validMisses = append(validMisses, miss)
} }
} }
@ -223,206 +244,246 @@ func mergeSerieses(cachedSeries, missedSeries []*v3.Series) []*v3.Series {
return mergedSeries return mergedSeries
} }
func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Series, error, map[string]string) { func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, error, map[string]string) {
cacheKeys := q.keyGenerator.GenerateKeys(params) cacheKeys := q.keyGenerator.GenerateKeys(params)
seriesList := make([]*v3.Series, 0) ch := make(chan channelResult, len(params.CompositeQuery.BuilderQueries))
errQueriesByName := make(map[string]string) var wg sync.WaitGroup
var err error
for queryName, builderQuery := range params.CompositeQuery.BuilderQueries { for queryName, builderQuery := range params.CompositeQuery.BuilderQueries {
if builderQuery.Disabled {
// TODO: add support for logs and traces
if builderQuery.DataSource == v3.DataSourceLogs {
query, err := logsV3.PrepareLogsQuery(params.Start, params.End, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, logsV3.Options{})
if err != nil {
errQueriesByName[queryName] = err.Error()
continue continue
} }
series, err := q.execClickHouseQuery(ctx, query) wg.Add(1)
if err != nil { if queryName == builderQuery.Expression {
errQueriesByName[queryName] = err.Error() go q.runBuilderQuery(ctx, builderQuery, params, keys, cacheKeys, ch, &wg)
continue } else {
go q.runBuilderExpression(ctx, builderQuery, params, keys, cacheKeys, ch, &wg)
} }
seriesList = append(seriesList, series...)
continue
} }
if builderQuery.DataSource == v3.DataSourceTraces { wg.Wait()
query, err := tracesV3.PrepareTracesQuery(params.Start, params.End, params.CompositeQuery.PanelType, builderQuery, keys, tracesV3.Options{ close(ch)
GraphLimitQtype: "",
PreferRPM: false, results := make([]*v3.Result, 0)
errQueriesByName := make(map[string]string)
var errs []error
for result := range ch {
if result.Err != nil {
errs = append(errs, result.Err)
errQueriesByName[result.Name] = result.Err.Error()
continue
}
results = append(results, &v3.Result{
QueryName: result.Name,
Series: result.Series,
}) })
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
} }
series, err := q.execClickHouseQuery(ctx, query) var err error
if err != nil { if len(errs) > 0 {
errQueriesByName[queryName] = err.Error() err = fmt.Errorf("error in builder queries")
continue
}
seriesList = append(seriesList, series...)
continue
} }
return results, err, errQueriesByName
}
func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, error, map[string]string) {
channelResults := make(chan channelResult, len(params.CompositeQuery.PromQueries))
var wg sync.WaitGroup
cacheKeys := q.keyGenerator.GenerateKeys(params)
for queryName, promQuery := range params.CompositeQuery.PromQueries {
if promQuery.Disabled {
continue
}
wg.Add(1)
go func(queryName string, promQuery *v3.PromQuery) {
defer wg.Done()
cacheKey := cacheKeys[queryName] cacheKey := cacheKeys[queryName]
var cachedData []byte var cachedData []byte
if !params.NoCache { // Ensure NoCache is not set and cache is not nil
var retrieveStatus status.RetrieveStatus if !params.NoCache && q.cache != nil {
cachedData, retrieveStatus, err = q.cache.Retrieve(cacheKey, true) data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true)
zap.L().Debug("cache retrieve status", zap.String("status", retrieveStatus.String())) zap.S().Debug("cache retrieve status", zap.String("status", retrieveStatus.String()))
if err == nil {
cachedData = data
}
}
misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {
query := metricsV3.BuildPromQuery(promQuery, params.Step, miss.start, miss.end)
series, err := q.execPromQuery(ctx, query)
if err != nil {
channelResults <- channelResult{Err: err, Name: queryName, Query: query.Query, Series: nil}
return
}
missedSeries = append(missedSeries, series...)
}
if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil {
// ideally we should not be getting an error here
zap.S().Error("error unmarshalling cached data", zap.Error(err))
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
channelResults <- channelResult{Err: nil, Name: queryName, Query: promQuery.Query, Series: mergedSeries}
// Cache the seriesList for future queries
if len(missedSeries) > 0 && !params.NoCache && q.cache != nil {
mergedSeriesData, err := json.Marshal(mergedSeries)
if err != nil {
zap.S().Error("error marshalling merged series", zap.Error(err))
return
}
err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
if err != nil {
zap.S().Error("error storing merged series", zap.Error(err))
return
}
}
}(queryName, promQuery)
}
wg.Wait()
close(channelResults)
results := make([]*v3.Result, 0)
errQueriesByName := make(map[string]string)
var errs []error
for result := range channelResults {
if result.Err != nil {
errs = append(errs, result.Err)
errQueriesByName[result.Name] = result.Err.Error()
continue
}
results = append(results, &v3.Result{
QueryName: result.Name,
Series: result.Series,
})
}
var err error
if len(errs) > 0 {
err = fmt.Errorf("error in prom queries")
}
return results, err, errQueriesByName
}
func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, error, map[string]string) {
channelResults := make(chan channelResult, len(params.CompositeQuery.ClickHouseQueries))
var wg sync.WaitGroup
for queryName, clickHouseQuery := range params.CompositeQuery.ClickHouseQueries {
if clickHouseQuery.Disabled {
continue
}
wg.Add(1)
go func(queryName string, clickHouseQuery *v3.ClickHouseQuery) {
defer wg.Done()
series, err := q.execClickHouseQuery(ctx, clickHouseQuery.Query)
channelResults <- channelResult{Err: err, Name: queryName, Query: clickHouseQuery.Query, Series: series}
}(queryName, clickHouseQuery)
}
wg.Wait()
close(channelResults)
results := make([]*v3.Result, 0)
errQueriesByName := make(map[string]string)
var errs []error
for result := range channelResults {
if result.Err != nil {
errs = append(errs, result.Err)
errQueriesByName[result.Name] = result.Err.Error()
continue
}
results = append(results, &v3.Result{
QueryName: result.Name,
Series: result.Series,
})
}
var err error
if len(errs) > 0 {
err = fmt.Errorf("error in clickhouse queries")
}
return results, err, errQueriesByName
}
func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, error, map[string]string) {
queries, err := q.builder.PrepareQueries(params, keys)
if err != nil { if err != nil {
return nil, err, nil return nil, err, nil
} }
}
misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {
query, err := metricsV3.PrepareMetricQuery(
miss.start,
miss.end,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
metricsV3.Options{PreferRPM: false},
)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
series, err := q.execClickHouseQuery(ctx, query)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
missedSeries = append(missedSeries, series...)
}
if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil {
errQueriesByName[queryName] = err.Error()
continue
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
seriesList = append(seriesList, mergedSeries...) ch := make(chan channelResult, len(queries))
// Cache the seriesList for future queries var wg sync.WaitGroup
if len(missedSeries) > 0 {
mergedSeriesData, err := json.Marshal(mergedSeries) for name, query := range queries {
wg.Add(1)
go func(name, query string) {
defer wg.Done()
rowList, err := q.reader.GetListResultV3(ctx, query)
if err != nil { if err != nil {
errQueriesByName[queryName] = err.Error() ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err), Name: name, Query: query}
return
}
ch <- channelResult{List: rowList, Name: name, Query: query}
}(name, query)
}
wg.Wait()
close(ch)
var errs []error
errQuriesByName := make(map[string]string)
res := make([]*v3.Result, 0)
// read values from the channel
for r := range ch {
if r.Err != nil {
errs = append(errs, r.Err)
errQuriesByName[r.Name] = r.Query
continue continue
} }
err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour) res = append(res, &v3.Result{
if err != nil { QueryName: r.Name,
errQueriesByName[queryName] = err.Error() List: r.List,
continue })
} }
if len(errs) != 0 {
return nil, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)), errQuriesByName
} }
} return res, nil, nil
if len(errQueriesByName) > 0 {
err = fmt.Errorf("error in builder queries")
}
return seriesList, err, errQueriesByName
} }
func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Series, error, map[string]string) { func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, error, map[string]string) {
seriesList := make([]*v3.Series, 0) var results []*v3.Result
errQueriesByName := make(map[string]string)
var err error
for queryName, promQuery := range params.CompositeQuery.PromQueries {
cacheKey := q.keyGenerator.GenerateKeys(params)[queryName]
var cachedData []byte
var retrieveStatus status.RetrieveStatus
if !params.NoCache {
cachedData, retrieveStatus, err = q.cache.Retrieve(cacheKey, true)
zap.L().Debug("cache retrieve status", zap.String("status", retrieveStatus.String()))
}
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {
query := metricsV3.BuildPromQuery(
promQuery,
params.Step,
miss.start,
miss.end,
)
series, err := q.execPromQuery(ctx, query)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
missedSeries = append(missedSeries, series...)
}
if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil {
errQueriesByName[queryName] = err.Error()
continue
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
seriesList = append(seriesList, mergedSeries...)
// Cache the seriesList for future queries
if len(missedSeries) > 0 {
mergedSeriesData, err := json.Marshal(mergedSeries)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
}
}
if len(errQueriesByName) > 0 {
err = fmt.Errorf("error in prom queries")
}
return seriesList, err, errQueriesByName
}
func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Series, error, map[string]string) {
seriesList := make([]*v3.Series, 0)
errQueriesByName := make(map[string]string)
var err error
for queryName, clickHouseQuery := range params.CompositeQuery.ClickHouseQueries {
series, err := q.execClickHouseQuery(ctx, clickHouseQuery.Query)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
seriesList = append(seriesList, series...)
}
if len(errQueriesByName) > 0 {
err = fmt.Errorf("error in clickhouse queries")
}
return seriesList, err, errQueriesByName
}
func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Series, error, map[string]string) {
var seriesList []*v3.Series
var err error var err error
var errQueriesByName map[string]string var errQueriesByName map[string]string
if params.CompositeQuery != nil { if params.CompositeQuery != nil {
switch params.CompositeQuery.QueryType { switch params.CompositeQuery.QueryType {
case v3.QueryTypeBuilder: case v3.QueryTypeBuilder:
seriesList, err, errQueriesByName = q.runBuilderQueries(ctx, params, keys) if params.CompositeQuery.PanelType == v3.PanelTypeList || params.CompositeQuery.PanelType == v3.PanelTypeTrace {
results, err, errQueriesByName = q.runBuilderListQueries(ctx, params, keys)
} else {
results, err, errQueriesByName = q.runBuilderQueries(ctx, params, keys)
}
case v3.QueryTypePromQL: case v3.QueryTypePromQL:
seriesList, err, errQueriesByName = q.runPromQueries(ctx, params) results, err, errQueriesByName = q.runPromQueries(ctx, params)
case v3.QueryTypeClickHouseSQL: case v3.QueryTypeClickHouseSQL:
seriesList, err, errQueriesByName = q.runClickHouseQueries(ctx, params) results, err, errQueriesByName = q.runClickHouseQueries(ctx, params)
default: default:
err = fmt.Errorf("invalid query type") err = fmt.Errorf("invalid query type")
} }
} }
return seriesList, err, errQueriesByName return results, err, errQueriesByName
} }
func (q *querier) QueriesExecuted() []string { func (q *querier) QueriesExecuted() []string {

View File

@ -406,9 +406,11 @@ func TestQueryRange(t *testing.T) {
End: 1675115596722 + 120*60*1000, End: 1675115596722 + 120*60*1000,
CompositeQuery: &v3.CompositeQuery{ CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder, QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
BuilderQueries: map[string]*v3.BuilderQuery{ BuilderQueries: map[string]*v3.BuilderQuery{
"A": { "A": {
QueryName: "A", QueryName: "A",
DataSource: v3.DataSourceMetrics,
StepInterval: 60, StepInterval: 60,
AggregateAttribute: v3.AttributeKey{Key: "http_server_requests_seconds_count", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true}, AggregateAttribute: v3.AttributeKey{Key: "http_server_requests_seconds_count", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true},
Filters: &v3.FilterSet{ Filters: &v3.FilterSet{
@ -436,11 +438,13 @@ func TestQueryRange(t *testing.T) {
End: 1675115596722 + 180*60*1000, End: 1675115596722 + 180*60*1000,
CompositeQuery: &v3.CompositeQuery{ CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder, QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
BuilderQueries: map[string]*v3.BuilderQuery{ BuilderQueries: map[string]*v3.BuilderQuery{
"A": { "A": {
QueryName: "A", QueryName: "A",
StepInterval: 60, StepInterval: 60,
AggregateAttribute: v3.AttributeKey{Key: "http_server_requests_seconds_count", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true}, AggregateAttribute: v3.AttributeKey{Key: "http_server_requests_seconds_count", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true},
DataSource: v3.DataSourceMetrics,
Filters: &v3.FilterSet{ Filters: &v3.FilterSet{
Operator: "AND", Operator: "AND",
Items: []v3.FilterItem{ Items: []v3.FilterItem{
@ -461,6 +465,73 @@ func TestQueryRange(t *testing.T) {
}, },
}, },
}, },
// No caching for traces & logs yet
{
Start: 1675115596722,
End: 1675115596722 + 120*60*1000,
Step: 5 * time.Minute.Milliseconds(),
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
AggregateAttribute: v3.AttributeKey{Key: "durationNano", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true},
StepInterval: 60,
DataSource: v3.DataSourceTraces,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{Key: "method", IsColumn: false},
Operator: "=",
Value: "GET",
},
},
},
GroupBy: []v3.AttributeKey{
{Key: "serviceName", IsColumn: false},
{Key: "name", IsColumn: false},
},
AggregateOperator: v3.AggregateOperatorP95,
Expression: "A",
},
},
},
},
{
Start: 1675115596722 + 60*60*1000,
End: 1675115596722 + 180*60*1000,
Step: 5 * time.Minute.Milliseconds(),
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
AggregateAttribute: v3.AttributeKey{Key: "durationNano", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true},
StepInterval: 60,
DataSource: v3.DataSourceTraces,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{Key: "method", IsColumn: false},
Operator: "=",
Value: "GET",
},
},
},
GroupBy: []v3.AttributeKey{
{Key: "serviceName", IsColumn: false},
{Key: "name", IsColumn: false},
},
AggregateOperator: v3.AggregateOperatorP95,
Expression: "A",
},
},
},
},
} }
cache := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute}) cache := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute})
opts := QuerierOptions{ opts := QuerierOptions{
@ -489,6 +560,117 @@ func TestQueryRange(t *testing.T) {
expectedTimeRangeInQueryString := []string{ expectedTimeRangeInQueryString := []string{
fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms <= %d", 1675115580000, 1675115580000+120*60*1000), fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms <= %d", 1675115580000, 1675115580000+120*60*1000),
fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms <= %d", 1675115580000+120*60*1000, 1675115580000+180*60*1000), fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms <= %d", 1675115580000+120*60*1000, 1675115580000+180*60*1000),
fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", 1675115580000*1000000, (1675115580000+120*60*1000)*int64(1000000)),
fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", (1675115580000+60*60*1000)*int64(1000000), (1675115580000+180*60*1000)*int64(1000000)),
}
for i, param := range params {
_, err, errByName := q.QueryRange(context.Background(), param, nil)
if err != nil {
t.Errorf("expected no error, got %s", err)
}
if len(errByName) > 0 {
t.Errorf("expected no error, got %v", errByName)
}
if !strings.Contains(q.QueriesExecuted()[i], expectedTimeRangeInQueryString[i]) {
t.Errorf("expected query to contain %s, got %s", expectedTimeRangeInQueryString[i], q.QueriesExecuted()[i])
}
}
}
func TestQueryRangeValueType(t *testing.T) {
// There shouldn't be any caching for value panel type
params := []*v3.QueryRangeParamsV3{
{
Start: 1675115596722,
End: 1675115596722 + 120*60*1000,
Step: 5 * time.Minute.Milliseconds(),
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeValue,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{Key: "http_server_requests_seconds_count", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true},
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{Key: "method", IsColumn: false},
Operator: "=",
Value: "GET",
},
},
},
AggregateOperator: v3.AggregateOperatorSumRate,
Expression: "A",
ReduceTo: v3.ReduceToOperatorLast,
},
},
},
},
{
Start: 1675115596722 + 60*60*1000,
End: 1675115596722 + 180*60*1000,
Step: 5 * time.Minute.Milliseconds(),
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeValue,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceTraces,
AggregateAttribute: v3.AttributeKey{Key: "durationNano", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true},
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{Key: "method", IsColumn: false},
Operator: "=",
Value: "GET",
},
},
},
AggregateOperator: v3.AggregateOperatorP95,
Expression: "A",
ReduceTo: v3.ReduceToOperatorLast,
},
},
},
},
}
cache := inmemory.New(&inmemory.Options{TTL: 60 * time.Minute, CleanupInterval: 10 * time.Minute})
opts := QuerierOptions{
Cache: cache,
Reader: nil,
FluxInterval: 5 * time.Minute,
KeyGenerator: queryBuilder.NewKeyGenerator(),
TestingMode: true,
ReturnedSeries: []*v3.Series{
{
Labels: map[string]string{
"method": "GET",
"service_name": "test",
"__name__": "http_server_requests_seconds_count",
},
Points: []v3.Point{
{Timestamp: 1675115596722, Value: 1},
{Timestamp: 1675115596722 + 60*60*1000, Value: 2},
{Timestamp: 1675115596722 + 120*60*1000, Value: 3},
},
},
},
}
q := NewQuerier(opts)
// No caching
expectedTimeRangeInQueryString := []string{
fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms <= %d", 1675115580000, 1675115580000+120*60*1000),
fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", (1675115580000+60*60*1000)*int64(1000000), (1675115580000+180*60*1000)*int64(1000000)),
} }
for i, param := range params { for i, param := range params {

View File

@ -275,12 +275,35 @@ func expressionToKey(expression *govaluate.EvaluableExpression, keys map[string]
return formula.ExpressionString() return formula.ExpressionString()
} }
func isMetricExpression(expression *govaluate.EvaluableExpression, params *v3.QueryRangeParamsV3) bool {
variables := unique(expression.Vars())
for _, variable := range variables {
if params.CompositeQuery.BuilderQueries[variable].DataSource != v3.DataSourceMetrics {
return false
}
}
return true
}
func (c *cacheKeyGenerator) GenerateKeys(params *v3.QueryRangeParamsV3) map[string]string { func (c *cacheKeyGenerator) GenerateKeys(params *v3.QueryRangeParamsV3) map[string]string {
keys := make(map[string]string) keys := make(map[string]string)
// For non-graph panels, we don't support caching
if params.CompositeQuery.PanelType != v3.PanelTypeGraph {
return keys
}
// Use query as the cache key for PromQL queries
if params.CompositeQuery.QueryType == v3.QueryTypePromQL {
for name, query := range params.CompositeQuery.PromQueries {
keys[name] = query.Query
}
return keys
}
// Build keys for each builder query // Build keys for each builder query
for queryName, query := range params.CompositeQuery.BuilderQueries { for queryName, query := range params.CompositeQuery.BuilderQueries {
if query.Expression == queryName { if query.Expression == queryName && query.DataSource == v3.DataSourceMetrics {
var parts []string var parts []string
// We need to build uniqe cache query for BuilderQuery // We need to build uniqe cache query for BuilderQuery
@ -321,6 +344,10 @@ func (c *cacheKeyGenerator) GenerateKeys(params *v3.QueryRangeParamsV3) map[stri
if query.Expression != query.QueryName { if query.Expression != query.QueryName {
expression, _ := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, EvalFuncs) expression, _ := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, EvalFuncs)
if !isMetricExpression(expression, params) {
continue
}
expressionCacheKey := expressionToKey(expression, keys) expressionCacheKey := expressionToKey(expression, keys)
keys[query.QueryName] = expressionCacheKey keys[query.QueryName] = expressionCacheKey
} }

View File

@ -22,11 +22,12 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/clickhouseReader" "go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
"go.signoz.io/signoz/pkg/query-service/app/dashboards" "go.signoz.io/signoz/pkg/query-service/app/dashboards"
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
opamp "go.signoz.io/signoz/pkg/query-service/app/opamp" "go.signoz.io/signoz/pkg/query-service/app/opamp"
opAmpModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model" opAmpModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model"
"go.signoz.io/signoz/pkg/query-service/app/explorer" "go.signoz.io/signoz/pkg/query-service/app/explorer"
"go.signoz.io/signoz/pkg/query-service/auth" "go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/cache"
"go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/dao" "go.signoz.io/signoz/pkg/query-service/dao"
"go.signoz.io/signoz/pkg/query-service/featureManager" "go.signoz.io/signoz/pkg/query-service/featureManager"
@ -54,6 +55,8 @@ type ServerOptions struct {
MaxIdleConns int MaxIdleConns int
MaxOpenConns int MaxOpenConns int
DialTimeout time.Duration DialTimeout time.Duration
CacheConfigPath string
FluxInterval string
} }
// Server runs HTTP, Mux and a grpc server // Server runs HTTP, Mux and a grpc server
@ -134,6 +137,16 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
return nil, err return nil, err
} }
var c cache.Cache
if serverOptions.CacheConfigPath != "" {
cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath)
if err != nil {
return nil, err
}
c = cache.NewCache(cacheOpts)
}
fluxInterval, err := time.ParseDuration(serverOptions.FluxInterval)
// ingestion pipelines manager // ingestion pipelines manager
logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(localDB, "sqlite") logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(localDB, "sqlite")
if err != nil { if err != nil {
@ -153,6 +166,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
RuleManager: rm, RuleManager: rm,
FeatureFlags: fm, FeatureFlags: fm,
LogsParsingPipelineController: logParsingPipelineController, LogsParsingPipelineController: logParsingPipelineController,
Cache: c,
FluxInterval: fluxInterval,
}) })
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -0,0 +1,4 @@
provider: "inmemory"
inmemory:
ttl: 60m
cleanupInterval: 10m

View File

@ -100,7 +100,7 @@ type Reader interface {
} }
type Querier interface { type Querier interface {
QueryRange(context.Context, *v3.QueryRangeParamsV3, map[string]v3.AttributeKey) ([]*v3.Series, error, map[string]string) QueryRange(context.Context, *v3.QueryRangeParamsV3, map[string]v3.AttributeKey) ([]*v3.Result, error, map[string]string)
// test helpers // test helpers
QueriesExecuted() []string QueriesExecuted() []string

View File

@ -33,7 +33,7 @@ func main() {
var disableRules bool var disableRules bool
// the url used to build link in the alert messages in slack and other systems // the url used to build link in the alert messages in slack and other systems
var ruleRepoURL string var ruleRepoURL, cacheConfigPath, fluxInterval string
var preferDelta bool var preferDelta bool
var preferSpanMetrics bool var preferSpanMetrics bool
@ -51,6 +51,8 @@ func main() {
flag.IntVar(&maxOpenConns, "max-open-conns", 100, "(max connections for use at any time.)") flag.IntVar(&maxOpenConns, "max-open-conns", 100, "(max connections for use at any time.)")
flag.DurationVar(&dialTimeout, "dial-timeout", 5*time.Second, "(the maximum time to establish a connection.)") flag.DurationVar(&dialTimeout, "dial-timeout", 5*time.Second, "(the maximum time to establish a connection.)")
flag.StringVar(&ruleRepoURL, "rules.repo-url", constants.AlertHelpPage, "(host address used to build rule link in alert messages)") flag.StringVar(&ruleRepoURL, "rules.repo-url", constants.AlertHelpPage, "(host address used to build rule link in alert messages)")
flag.StringVar(&cacheConfigPath, "experimental.cache-config", "", "(cache config to use)")
flag.StringVar(&fluxInterval, "flux-interval", "5m", "(cache config to use)")
flag.Parse() flag.Parse()
loggerMgr := initZapLog() loggerMgr := initZapLog()
@ -72,6 +74,8 @@ func main() {
MaxIdleConns: maxIdleConns, MaxIdleConns: maxIdleConns,
MaxOpenConns: maxOpenConns, MaxOpenConns: maxOpenConns,
DialTimeout: dialTimeout, DialTimeout: dialTimeout,
CacheConfigPath: cacheConfigPath,
FluxInterval: fluxInterval,
} }
// Read the jwt secret key // Read the jwt secret key

View File

@ -53,7 +53,6 @@ func (uf UserFlag) Value() (driver.Value, error) {
} }
func (uf *UserFlag) Scan(value interface{}) error { func (uf *UserFlag) Scan(value interface{}) error {
fmt.Println(" value:", value)
if value == "" { if value == "" {
return nil return nil
} }

View File

@ -136,6 +136,7 @@ type QueryRangeParamsV2 struct {
Step int64 `json:"step"` Step int64 `json:"step"`
CompositeMetricQuery *CompositeMetricQuery `json:"compositeMetricQuery"` CompositeMetricQuery *CompositeMetricQuery `json:"compositeMetricQuery"`
Variables map[string]interface{} `json:"variables,omitempty"` Variables map[string]interface{} `json:"variables,omitempty"`
NoCache bool `json:"noCache"`
} }
type DashboardVars struct { type DashboardVars struct {

View File

@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"math" "math"
"sort"
"strconv" "strconv"
"time" "time"
@ -494,6 +495,12 @@ type Series struct {
Points []MetricPoint `json:"values"` Points []MetricPoint `json:"values"`
} }
func (s *Series) SortPoints() {
sort.Slice(s.Points, func(i, j int) bool {
return s.Points[i].Timestamp < s.Points[j].Timestamp
})
}
type MetricPoint struct { type MetricPoint struct {
Timestamp int64 Timestamp int64
Value float64 Value float64
@ -505,6 +512,17 @@ func (p *MetricPoint) MarshalJSON() ([]byte, error) {
return json.Marshal([...]interface{}{float64(p.Timestamp) / 1000, v}) return json.Marshal([...]interface{}{float64(p.Timestamp) / 1000, v})
} }
// UnmarshalJSON implements json.Unmarshaler.
func (p *MetricPoint) UnmarshalJSON(b []byte) error {
var a [2]interface{}
if err := json.Unmarshal(b, &a); err != nil {
return err
}
p.Timestamp = int64(a[0].(float64) * 1000)
p.Value, _ = strconv.ParseFloat(a[1].(string), 64)
return nil
}
type ShowCreateTableStatement struct { type ShowCreateTableStatement struct {
Statement string `json:"statement" ch:"statement"` Statement string `json:"statement" ch:"statement"`
} }