diff --git a/Makefile b/Makefile index 1761acd26d..05b6cb27e6 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ LOCAL_GOOS ?= $(shell go env GOOS) LOCAL_GOARCH ?= $(shell go env GOARCH) REPONAME ?= signoz -DOCKER_TAG ?= latest +DOCKER_TAG ?= $(subst v,,$(BUILD_VERSION)) FRONTEND_DOCKER_IMAGE ?= frontend QUERY_SERVICE_DOCKER_IMAGE ?= query-service diff --git a/ee/query-service/app/api/api.go b/ee/query-service/app/api/api.go index 9239be2d99..5cdca8e204 100644 --- a/ee/query-service/app/api/api.go +++ b/ee/query-service/app/api/api.go @@ -10,6 +10,7 @@ import ( "go.signoz.io/signoz/ee/query-service/license" baseapp "go.signoz.io/signoz/pkg/query-service/app" "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" + "go.signoz.io/signoz/pkg/query-service/cache" baseint "go.signoz.io/signoz/pkg/query-service/interfaces" basemodel "go.signoz.io/signoz/pkg/query-service/model" rules "go.signoz.io/signoz/pkg/query-service/rules" @@ -29,6 +30,9 @@ type APIHandlerOptions struct { FeatureFlags baseint.FeatureLookup LicenseManager *license.Manager LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController + Cache cache.Cache + // Querier Influx Interval + FluxInterval time.Duration } type APIHandler struct { @@ -51,6 +55,8 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) { RuleManager: opts.RulesManager, FeatureFlags: opts.FeatureFlags, LogsParsingPipelineController: opts.LogsParsingPipelineController, + Cache: opts.Cache, + FluxInterval: opts.FluxInterval, }) if err != nil { diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index aee2c87160..e36d201ec3 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -35,6 +35,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/app/opamp" opAmpModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model" baseauth "go.signoz.io/signoz/pkg/query-service/auth" + "go.signoz.io/signoz/pkg/query-service/cache" baseconst "go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/healthcheck" basealm "go.signoz.io/signoz/pkg/query-service/integrations/alertManager" @@ -62,6 +63,8 @@ type ServerOptions struct { MaxIdleConns int MaxOpenConns int DialTimeout time.Duration + CacheConfigPath string + FluxInterval string } // Server runs HTTP api service @@ -189,6 +192,21 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { telemetry.GetInstance().SetReader(reader) + var c cache.Cache + if serverOptions.CacheConfigPath != "" { + cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath) + if err != nil { + return nil, err + } + c = cache.NewCache(cacheOpts) + } + + fluxInterval, err := time.ParseDuration(serverOptions.FluxInterval) + + if err != nil { + return nil, err + } + apiOpts := api.APIHandlerOptions{ DataConnector: reader, SkipConfig: skipConfig, @@ -202,6 +220,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { FeatureFlags: lm, LicenseManager: lm, LogsParsingPipelineController: logParsingPipelineController, + Cache: c, + FluxInterval: fluxInterval, } apiHandler, err := api.NewAPIHandler(apiOpts) diff --git a/ee/query-service/main.go b/ee/query-service/main.go index dcdedeb9db..d9b90340ae 100644 --- a/ee/query-service/main.go +++ b/ee/query-service/main.go @@ -82,6 +82,7 @@ func main() { // the url used to build link in the alert messages in slack and other systems var ruleRepoURL string + var cacheConfigPath, fluxInterval string var enableQueryServiceLogOTLPExport bool var preferDelta bool var preferSpanMetrics bool @@ -99,10 +100,14 @@ func main() { flag.IntVar(&maxOpenConns, "max-open-conns", 100, "(max connections for use at any time.)") flag.DurationVar(&dialTimeout, "dial-timeout", 5*time.Second, "(the maximum time to establish a connection.)") flag.StringVar(&ruleRepoURL, "rules.repo-url", baseconst.AlertHelpPage, "(host address used to build rule link in alert messages)") + flag.StringVar(&cacheConfigPath, "experimental.cache-config", "", "(cache config to use)") + flag.StringVar(&fluxInterval, "flux-interval", "5m", "(cache config to use)") flag.BoolVar(&enableQueryServiceLogOTLPExport, "enable.query.service.log.otlp.export", false, "(enable query service log otlp export)") + flag.Parse() loggerMgr := initZapLog(enableQueryServiceLogOTLPExport) + zap.ReplaceGlobals(loggerMgr) defer loggerMgr.Sync() // flushes buffer, if any @@ -121,6 +126,8 @@ func main() { MaxIdleConns: maxIdleConns, MaxOpenConns: maxOpenConns, DialTimeout: dialTimeout, + CacheConfigPath: cacheConfigPath, + FluxInterval: fluxInterval, } // Read the jwt secret key diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index a576b637e1..2d4b4c33ac 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -28,9 +28,11 @@ import ( "go.signoz.io/signoz/pkg/query-service/app/metrics" metricsv3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" "go.signoz.io/signoz/pkg/query-service/app/parser" + "go.signoz.io/signoz/pkg/query-service/app/querier" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" "go.signoz.io/signoz/pkg/query-service/auth" + "go.signoz.io/signoz/pkg/query-service/cache" "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" querytemplate "go.signoz.io/signoz/pkg/query-service/utils/queryTemplate" @@ -51,8 +53,9 @@ import ( type status string const ( - statusSuccess status = "success" - statusError status = "error" + statusSuccess status = "success" + statusError status = "error" + defaultFluxInterval = 5 * time.Minute ) // NewRouter creates and configures a Gorilla Router. @@ -73,6 +76,7 @@ type APIHandler struct { ruleManager *rules.Manager featureFlags interfaces.FeatureLookup ready func(http.HandlerFunc) http.HandlerFunc + querier interfaces.Querier queryBuilder *queryBuilder.QueryBuilder preferDelta bool preferSpanMetrics bool @@ -114,6 +118,11 @@ type APIHandlerOpts struct { // Log parsing pipelines LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController + // cache + Cache cache.Cache + + // Querier Influx Interval + FluxInterval time.Duration } // NewAPIHandler returns an APIHandler @@ -124,6 +133,16 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { return nil, err } + querierOpts := querier.QuerierOptions{ + Reader: opts.Reader, + Cache: opts.Cache, + KeyGenerator: queryBuilder.NewKeyGenerator(), + FluxInterval: opts.FluxInterval, + FeatureLookup: opts.FeatureFlags, + } + + querier := querier.NewQuerier(querierOpts) + aH := &APIHandler{ reader: opts.Reader, appDao: opts.AppDao, @@ -137,6 +156,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { ruleManager: opts.RuleManager, featureFlags: opts.FeatureFlags, LogsParsingPipelineController: opts.LogsParsingPipelineController, + querier: querier, } builderOpts := queryBuilder.QueryBuilderOptions{ @@ -2965,9 +2985,8 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que var result []*v3.Result var err error var errQuriesByName map[string]string - var queries map[string]string - switch queryRangeParams.CompositeQuery.QueryType { - case v3.QueryTypeBuilder: + var spanKeys map[string]v3.AttributeKey + if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder { // check if any enrichment is required for logs if yes then enrich them if logsv3.EnrichmentRequired(queryRangeParams) { // get the fields if any logs query is present @@ -2981,42 +3000,16 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que logsv3.Enrich(queryRangeParams, fields) } - var spanKeys map[string]v3.AttributeKey spanKeys, err = aH.getSpanKeysV3(ctx, queryRangeParams) if err != nil { apiErrObj := &model.ApiError{Typ: model.ErrorInternal, Err: err} RespondError(w, apiErrObj, errQuriesByName) return } - - queries, err = aH.queryBuilder.PrepareQueries(queryRangeParams, spanKeys) - if err != nil { - RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) - return - } - - if queryRangeParams.CompositeQuery.PanelType == v3.PanelTypeList || queryRangeParams.CompositeQuery.PanelType == v3.PanelTypeTrace { - result, err, errQuriesByName = aH.execClickHouseListQueries(r.Context(), queries) - } else { - result, err, errQuriesByName = aH.execClickHouseGraphQueries(r.Context(), queries) - } - case v3.QueryTypeClickHouseSQL: - queries := make(map[string]string) - for name, query := range queryRangeParams.CompositeQuery.ClickHouseQueries { - if query.Disabled { - continue - } - queries[name] = query.Query - } - result, err, errQuriesByName = aH.execClickHouseGraphQueries(r.Context(), queries) - case v3.QueryTypePromQL: - result, err, errQuriesByName = aH.execPromQueries(r.Context(), queryRangeParams) - default: - err = fmt.Errorf("invalid query type") - RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, errQuriesByName) - return } + result, err, errQuriesByName = aH.querier.QueryRange(ctx, queryRangeParams, spanKeys) + if err != nil { apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} RespondError(w, apiErrObj, errQuriesByName) diff --git a/pkg/query-service/app/metrics/v3/query_builder_test.go b/pkg/query-service/app/metrics/v3/query_builder_test.go index d315c52081..3f91641f30 100644 --- a/pkg/query-service/app/metrics/v3/query_builder_test.go +++ b/pkg/query-service/app/metrics/v3/query_builder_test.go @@ -82,6 +82,7 @@ func TestBuildQueryWithMultipleQueries(t *testing.T) { }, "B": { QueryName: "B", + StepInterval: 60, AggregateAttribute: v3.AttributeKey{Key: "name2"}, AggregateOperator: v3.AggregateOperatorRateMax, Expression: "B", diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index 5c44ba19f4..8e8467458f 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -844,7 +844,6 @@ func parseFilterAttributeKeyRequest(r *http.Request) (*v3.FilterAttributeKeyRequ dataSource := v3.DataSource(r.URL.Query().Get("dataSource")) aggregateOperator := v3.AggregateOperator(r.URL.Query().Get("aggregateOperator")) aggregateAttribute := r.URL.Query().Get("aggregateAttribute") - limit, err := strconv.Atoi(r.URL.Query().Get("limit")) if err != nil { limit = 50 diff --git a/pkg/query-service/app/querier/helper.go b/pkg/query-service/app/querier/helper.go new file mode 100644 index 0000000000..4d809a18f3 --- /dev/null +++ b/pkg/query-service/app/querier/helper.go @@ -0,0 +1,304 @@ +package querier + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "time" + + logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" + metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" + tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" + "go.signoz.io/signoz/pkg/query-service/cache/status" + "go.signoz.io/signoz/pkg/query-service/constants" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.uber.org/zap" +) + +func (q *querier) runBuilderQuery( + ctx context.Context, + builderQuery *v3.BuilderQuery, + params *v3.QueryRangeParamsV3, + keys map[string]v3.AttributeKey, + cacheKeys map[string]string, + ch chan channelResult, + wg *sync.WaitGroup, +) { + defer wg.Done() + queryName := builderQuery.QueryName + + var preferRPM bool + + if q.featureLookUp != nil { + preferRPM = q.featureLookUp.CheckFeature(constants.PreferRPM) == nil + } + + // TODO: handle other data sources + if builderQuery.DataSource == v3.DataSourceLogs { + var query string + var err error + // for ts query with limit replace it as it is already formed + if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 { + limitQuery, err := logsV3.PrepareLogsQuery( + params.Start, + params.End, + params.CompositeQuery.QueryType, + params.CompositeQuery.PanelType, + builderQuery, + logsV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: preferRPM}, + ) + if err != nil { + ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil} + return + } + placeholderQuery, err := logsV3.PrepareLogsQuery( + params.Start, + params.End, + params.CompositeQuery.QueryType, + params.CompositeQuery.PanelType, + builderQuery, + logsV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: preferRPM}, + ) + if err != nil { + ch <- channelResult{Err: err, Name: queryName, Query: placeholderQuery, Series: nil} + return + } + query = fmt.Sprintf(placeholderQuery, limitQuery) + } else { + query, err = logsV3.PrepareLogsQuery( + params.Start, + params.End, + params.CompositeQuery.QueryType, + params.CompositeQuery.PanelType, + builderQuery, + logsV3.Options{PreferRPM: preferRPM}, + ) + if err != nil { + ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} + return + } + } + + if err != nil { + ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} + return + } + series, err := q.execClickHouseQuery(ctx, query) + ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series} + return + } + + if builderQuery.DataSource == v3.DataSourceTraces { + + var query string + var err error + // for ts query with group by and limit form two queries + if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 { + limitQuery, err := tracesV3.PrepareTracesQuery( + params.Start, + params.End, + params.CompositeQuery.PanelType, + builderQuery, + keys, + tracesV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: preferRPM}, + ) + if err != nil { + ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil} + return + } + placeholderQuery, err := tracesV3.PrepareTracesQuery( + params.Start, + params.End, + params.CompositeQuery.PanelType, + builderQuery, + keys, + tracesV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: preferRPM}, + ) + if err != nil { + ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil} + return + } + query = fmt.Sprintf(placeholderQuery, limitQuery) + } else { + query, err = tracesV3.PrepareTracesQuery( + params.Start, + params.End, + params.CompositeQuery.PanelType, + builderQuery, + keys, + tracesV3.Options{PreferRPM: preferRPM}, + ) + if err != nil { + ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} + return + } + } + + series, err := q.execClickHouseQuery(ctx, query) + ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series} + return + } + + // What is happening here? + // We are only caching the graph panel queries. A non-existant cache key means that the query is not cached. + // If the query is not cached, we execute the query and return the result without caching it. + if _, ok := cacheKeys[queryName]; !ok { + query, err := metricsV3.PrepareMetricQuery(params.Start, params.End, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, metricsV3.Options{PreferRPM: preferRPM}) + if err != nil { + ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} + return + } + series, err := q.execClickHouseQuery(ctx, query) + ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series} + return + } + + cacheKey := cacheKeys[queryName] + var cachedData []byte + if !params.NoCache && q.cache != nil { + var retrieveStatus status.RetrieveStatus + data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true) + zap.S().Debug("cache retrieve status", zap.String("status", retrieveStatus.String())) + if err == nil { + cachedData = data + } + } + misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData) + missedSeries := make([]*v3.Series, 0) + cachedSeries := make([]*v3.Series, 0) + for _, miss := range misses { + query, err := metricsV3.PrepareMetricQuery( + miss.start, + miss.end, + params.CompositeQuery.QueryType, + params.CompositeQuery.PanelType, + builderQuery, + metricsV3.Options{}, + ) + if err != nil { + ch <- channelResult{ + Err: err, + Name: queryName, + Query: query, + Series: nil, + } + return + } + series, err := q.execClickHouseQuery(ctx, query) + if err != nil { + ch <- channelResult{ + Err: err, + Name: queryName, + Query: query, + Series: nil, + } + return + } + missedSeries = append(missedSeries, series...) + } + if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { + zap.S().Error("error unmarshalling cached data", zap.Error(err)) + } + mergedSeries := mergeSerieses(cachedSeries, missedSeries) + + ch <- channelResult{ + Err: nil, + Name: queryName, + Series: mergedSeries, + } + // Cache the seriesList for future queries + if len(missedSeries) > 0 && !params.NoCache && q.cache != nil { + mergedSeriesData, err := json.Marshal(mergedSeries) + if err != nil { + zap.S().Error("error marshalling merged series", zap.Error(err)) + return + } + err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour) + if err != nil { + zap.S().Error("error storing merged series", zap.Error(err)) + return + } + } +} + +func (q *querier) runBuilderExpression( + ctx context.Context, + builderQuery *v3.BuilderQuery, + params *v3.QueryRangeParamsV3, + keys map[string]v3.AttributeKey, + cacheKeys map[string]string, + ch chan channelResult, + wg *sync.WaitGroup, +) { + defer wg.Done() + + queryName := builderQuery.QueryName + + queries, err := q.builder.PrepareQueries(params, keys) + if err != nil { + ch <- channelResult{Err: err, Name: queryName, Query: "", Series: nil} + return + } + + if _, ok := cacheKeys[queryName]; !ok { + query := queries[queryName] + series, err := q.execClickHouseQuery(ctx, query) + ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series} + return + } + + cacheKey := cacheKeys[queryName] + var cachedData []byte + if !params.NoCache && q.cache != nil { + var retrieveStatus status.RetrieveStatus + data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true) + zap.S().Debug("cache retrieve status", zap.String("status", retrieveStatus.String())) + if err == nil { + cachedData = data + } + } + misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData) + missedSeries := make([]*v3.Series, 0) + cachedSeries := make([]*v3.Series, 0) + for _, miss := range misses { + missQueries, _ := q.builder.PrepareQueries(&v3.QueryRangeParamsV3{ + Start: miss.start, + End: miss.end, + Step: params.Step, + NoCache: params.NoCache, + CompositeQuery: params.CompositeQuery, + Variables: params.Variables, + }, keys) + query := missQueries[queryName] + series, err := q.execClickHouseQuery(ctx, query) + if err != nil { + ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} + return + } + missedSeries = append(missedSeries, series...) + } + if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { + zap.S().Error("error unmarshalling cached data", zap.Error(err)) + } + mergedSeries := mergeSerieses(cachedSeries, missedSeries) + + ch <- channelResult{ + Err: nil, + Name: queryName, + Series: mergedSeries, + } + // Cache the seriesList for future queries + if len(missedSeries) > 0 && !params.NoCache && q.cache != nil { + mergedSeriesData, err := json.Marshal(mergedSeries) + if err != nil { + zap.S().Error("error marshalling merged series", zap.Error(err)) + return + } + err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour) + if err != nil { + zap.S().Error("error storing merged series", zap.Error(err)) + return + } + } +} diff --git a/pkg/query-service/app/querier/querier.go b/pkg/query-service/app/querier/querier.go index e0cea90126..f5c9e07113 100644 --- a/pkg/query-service/app/querier/querier.go +++ b/pkg/query-service/app/querier/querier.go @@ -7,20 +7,30 @@ import ( "math" "sort" "strings" + "sync" "time" logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" + "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" "go.signoz.io/signoz/pkg/query-service/cache" - "go.signoz.io/signoz/pkg/query-service/cache/status" "go.signoz.io/signoz/pkg/query-service/interfaces" "go.signoz.io/signoz/pkg/query-service/model" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.uber.org/multierr" "go.uber.org/zap" ) +type channelResult struct { + Series []*v3.Series + List []*v3.Row + Err error + Name string + Query string +} + type missInterval struct { start, end int64 // in milliseconds } @@ -32,6 +42,9 @@ type querier struct { fluxInterval time.Duration + builder *queryBuilder.QueryBuilder + featureLookUp interfaces.FeatureLookup + // used for testing // TODO(srikanthccv): remove this once we have a proper mock testingMode bool @@ -41,10 +54,11 @@ type querier struct { } type QuerierOptions struct { - Reader interfaces.Reader - Cache cache.Cache - KeyGenerator cache.KeyGenerator - FluxInterval time.Duration + Reader interfaces.Reader + Cache cache.Cache + KeyGenerator cache.KeyGenerator + FluxInterval time.Duration + FeatureLookup interfaces.FeatureLookup // used for testing TestingMode bool @@ -59,6 +73,13 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier { keyGenerator: opts.KeyGenerator, fluxInterval: opts.FluxInterval, + builder: queryBuilder.NewQueryBuilder(queryBuilder.QueryBuilderOptions{ + BuildTraceQuery: tracesV3.PrepareTracesQuery, + BuildLogQuery: logsV3.PrepareLogsQuery, + BuildMetricQuery: metricsV3.PrepareMetricQuery, + }, opts.FeatureLookup), + featureLookUp: opts.FeatureLookup, + testingMode: opts.TestingMode, returnedSeries: opts.ReturnedSeries, returnedErr: opts.ReturnedErr, @@ -160,7 +181,7 @@ func findMissingTimeRanges(start, end int64, seriesList []*v3.Series, fluxInterv var validMisses []missInterval for idx := range misses { miss := misses[idx] - if miss.start <= miss.end { + if miss.start < miss.end { validMisses = append(validMisses, miss) } } @@ -223,206 +244,246 @@ func mergeSerieses(cachedSeries, missedSeries []*v3.Series) []*v3.Series { return mergedSeries } -func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Series, error, map[string]string) { +func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, error, map[string]string) { cacheKeys := q.keyGenerator.GenerateKeys(params) - seriesList := make([]*v3.Series, 0) - errQueriesByName := make(map[string]string) - var err error + ch := make(chan channelResult, len(params.CompositeQuery.BuilderQueries)) + var wg sync.WaitGroup for queryName, builderQuery := range params.CompositeQuery.BuilderQueries { - - // TODO: add support for logs and traces - if builderQuery.DataSource == v3.DataSourceLogs { - query, err := logsV3.PrepareLogsQuery(params.Start, params.End, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, logsV3.Options{}) - if err != nil { - errQueriesByName[queryName] = err.Error() - continue - } - series, err := q.execClickHouseQuery(ctx, query) - if err != nil { - errQueriesByName[queryName] = err.Error() - continue - } - seriesList = append(seriesList, series...) + if builderQuery.Disabled { continue } - - if builderQuery.DataSource == v3.DataSourceTraces { - query, err := tracesV3.PrepareTracesQuery(params.Start, params.End, params.CompositeQuery.PanelType, builderQuery, keys, tracesV3.Options{ - GraphLimitQtype: "", - PreferRPM: false, - }) - if err != nil { - errQueriesByName[queryName] = err.Error() - continue - } - - series, err := q.execClickHouseQuery(ctx, query) - if err != nil { - errQueriesByName[queryName] = err.Error() - continue - } - seriesList = append(seriesList, series...) - continue - } - - cacheKey := cacheKeys[queryName] - var cachedData []byte - if !params.NoCache { - var retrieveStatus status.RetrieveStatus - cachedData, retrieveStatus, err = q.cache.Retrieve(cacheKey, true) - zap.L().Debug("cache retrieve status", zap.String("status", retrieveStatus.String())) - if err != nil { - return nil, err, nil - } - } - misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData) - missedSeries := make([]*v3.Series, 0) - cachedSeries := make([]*v3.Series, 0) - for _, miss := range misses { - query, err := metricsV3.PrepareMetricQuery( - miss.start, - miss.end, - params.CompositeQuery.QueryType, - params.CompositeQuery.PanelType, - builderQuery, - metricsV3.Options{PreferRPM: false}, - ) - if err != nil { - errQueriesByName[queryName] = err.Error() - continue - } - series, err := q.execClickHouseQuery(ctx, query) - if err != nil { - errQueriesByName[queryName] = err.Error() - continue - } - missedSeries = append(missedSeries, series...) - } - if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { - errQueriesByName[queryName] = err.Error() - continue - } - mergedSeries := mergeSerieses(cachedSeries, missedSeries) - - seriesList = append(seriesList, mergedSeries...) - // Cache the seriesList for future queries - if len(missedSeries) > 0 { - mergedSeriesData, err := json.Marshal(mergedSeries) - if err != nil { - errQueriesByName[queryName] = err.Error() - continue - } - err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour) - if err != nil { - errQueriesByName[queryName] = err.Error() - continue - } + wg.Add(1) + if queryName == builderQuery.Expression { + go q.runBuilderQuery(ctx, builderQuery, params, keys, cacheKeys, ch, &wg) + } else { + go q.runBuilderExpression(ctx, builderQuery, params, keys, cacheKeys, ch, &wg) } } - if len(errQueriesByName) > 0 { + + wg.Wait() + close(ch) + + results := make([]*v3.Result, 0) + errQueriesByName := make(map[string]string) + var errs []error + + for result := range ch { + if result.Err != nil { + errs = append(errs, result.Err) + errQueriesByName[result.Name] = result.Err.Error() + continue + } + results = append(results, &v3.Result{ + QueryName: result.Name, + Series: result.Series, + }) + } + + var err error + if len(errs) > 0 { err = fmt.Errorf("error in builder queries") } - return seriesList, err, errQueriesByName + + return results, err, errQueriesByName } -func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Series, error, map[string]string) { - seriesList := make([]*v3.Series, 0) - errQueriesByName := make(map[string]string) - var err error - for queryName, promQuery := range params.CompositeQuery.PromQueries { - cacheKey := q.keyGenerator.GenerateKeys(params)[queryName] - var cachedData []byte - var retrieveStatus status.RetrieveStatus - if !params.NoCache { - cachedData, retrieveStatus, err = q.cache.Retrieve(cacheKey, true) - zap.L().Debug("cache retrieve status", zap.String("status", retrieveStatus.String())) - } - if err != nil { - errQueriesByName[queryName] = err.Error() - continue - } - misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData) - missedSeries := make([]*v3.Series, 0) - cachedSeries := make([]*v3.Series, 0) - for _, miss := range misses { - query := metricsV3.BuildPromQuery( - promQuery, - params.Step, - miss.start, - miss.end, - ) - series, err := q.execPromQuery(ctx, query) - if err != nil { - errQueriesByName[queryName] = err.Error() - continue - } - missedSeries = append(missedSeries, series...) - } - if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { - errQueriesByName[queryName] = err.Error() - continue - } - mergedSeries := mergeSerieses(cachedSeries, missedSeries) +func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, error, map[string]string) { + channelResults := make(chan channelResult, len(params.CompositeQuery.PromQueries)) + var wg sync.WaitGroup + cacheKeys := q.keyGenerator.GenerateKeys(params) - seriesList = append(seriesList, mergedSeries...) - // Cache the seriesList for future queries - if len(missedSeries) > 0 { - mergedSeriesData, err := json.Marshal(mergedSeries) - if err != nil { - errQueriesByName[queryName] = err.Error() - continue - } - err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour) - if err != nil { - errQueriesByName[queryName] = err.Error() - continue - } + for queryName, promQuery := range params.CompositeQuery.PromQueries { + if promQuery.Disabled { + continue } + wg.Add(1) + go func(queryName string, promQuery *v3.PromQuery) { + defer wg.Done() + cacheKey := cacheKeys[queryName] + var cachedData []byte + // Ensure NoCache is not set and cache is not nil + if !params.NoCache && q.cache != nil { + data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true) + zap.S().Debug("cache retrieve status", zap.String("status", retrieveStatus.String())) + if err == nil { + cachedData = data + } + } + misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData) + missedSeries := make([]*v3.Series, 0) + cachedSeries := make([]*v3.Series, 0) + for _, miss := range misses { + query := metricsV3.BuildPromQuery(promQuery, params.Step, miss.start, miss.end) + series, err := q.execPromQuery(ctx, query) + if err != nil { + channelResults <- channelResult{Err: err, Name: queryName, Query: query.Query, Series: nil} + return + } + missedSeries = append(missedSeries, series...) + } + if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { + // ideally we should not be getting an error here + zap.S().Error("error unmarshalling cached data", zap.Error(err)) + } + mergedSeries := mergeSerieses(cachedSeries, missedSeries) + + channelResults <- channelResult{Err: nil, Name: queryName, Query: promQuery.Query, Series: mergedSeries} + + // Cache the seriesList for future queries + if len(missedSeries) > 0 && !params.NoCache && q.cache != nil { + mergedSeriesData, err := json.Marshal(mergedSeries) + if err != nil { + zap.S().Error("error marshalling merged series", zap.Error(err)) + return + } + err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour) + if err != nil { + zap.S().Error("error storing merged series", zap.Error(err)) + return + } + } + }(queryName, promQuery) } - if len(errQueriesByName) > 0 { + wg.Wait() + close(channelResults) + + results := make([]*v3.Result, 0) + errQueriesByName := make(map[string]string) + var errs []error + + for result := range channelResults { + if result.Err != nil { + errs = append(errs, result.Err) + errQueriesByName[result.Name] = result.Err.Error() + continue + } + results = append(results, &v3.Result{ + QueryName: result.Name, + Series: result.Series, + }) + } + + var err error + if len(errs) > 0 { err = fmt.Errorf("error in prom queries") } - return seriesList, err, errQueriesByName + + return results, err, errQueriesByName } -func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Series, error, map[string]string) { - seriesList := make([]*v3.Series, 0) - errQueriesByName := make(map[string]string) - var err error +func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, error, map[string]string) { + channelResults := make(chan channelResult, len(params.CompositeQuery.ClickHouseQueries)) + var wg sync.WaitGroup for queryName, clickHouseQuery := range params.CompositeQuery.ClickHouseQueries { - series, err := q.execClickHouseQuery(ctx, clickHouseQuery.Query) - if err != nil { - errQueriesByName[queryName] = err.Error() + if clickHouseQuery.Disabled { continue } - seriesList = append(seriesList, series...) + wg.Add(1) + go func(queryName string, clickHouseQuery *v3.ClickHouseQuery) { + defer wg.Done() + series, err := q.execClickHouseQuery(ctx, clickHouseQuery.Query) + channelResults <- channelResult{Err: err, Name: queryName, Query: clickHouseQuery.Query, Series: series} + }(queryName, clickHouseQuery) } - if len(errQueriesByName) > 0 { + wg.Wait() + close(channelResults) + + results := make([]*v3.Result, 0) + errQueriesByName := make(map[string]string) + var errs []error + + for result := range channelResults { + if result.Err != nil { + errs = append(errs, result.Err) + errQueriesByName[result.Name] = result.Err.Error() + continue + } + results = append(results, &v3.Result{ + QueryName: result.Name, + Series: result.Series, + }) + } + + var err error + if len(errs) > 0 { err = fmt.Errorf("error in clickhouse queries") } - return seriesList, err, errQueriesByName + return results, err, errQueriesByName } -func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Series, error, map[string]string) { - var seriesList []*v3.Series +func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, error, map[string]string) { + + queries, err := q.builder.PrepareQueries(params, keys) + + if err != nil { + return nil, err, nil + } + + ch := make(chan channelResult, len(queries)) + var wg sync.WaitGroup + + for name, query := range queries { + wg.Add(1) + go func(name, query string) { + defer wg.Done() + rowList, err := q.reader.GetListResultV3(ctx, query) + + if err != nil { + ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err), Name: name, Query: query} + return + } + ch <- channelResult{List: rowList, Name: name, Query: query} + }(name, query) + } + + wg.Wait() + close(ch) + + var errs []error + errQuriesByName := make(map[string]string) + res := make([]*v3.Result, 0) + // read values from the channel + for r := range ch { + if r.Err != nil { + errs = append(errs, r.Err) + errQuriesByName[r.Name] = r.Query + continue + } + res = append(res, &v3.Result{ + QueryName: r.Name, + List: r.List, + }) + } + if len(errs) != 0 { + return nil, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)), errQuriesByName + } + return res, nil, nil +} + +func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, error, map[string]string) { + var results []*v3.Result var err error var errQueriesByName map[string]string if params.CompositeQuery != nil { switch params.CompositeQuery.QueryType { case v3.QueryTypeBuilder: - seriesList, err, errQueriesByName = q.runBuilderQueries(ctx, params, keys) + if params.CompositeQuery.PanelType == v3.PanelTypeList || params.CompositeQuery.PanelType == v3.PanelTypeTrace { + results, err, errQueriesByName = q.runBuilderListQueries(ctx, params, keys) + } else { + results, err, errQueriesByName = q.runBuilderQueries(ctx, params, keys) + } case v3.QueryTypePromQL: - seriesList, err, errQueriesByName = q.runPromQueries(ctx, params) + results, err, errQueriesByName = q.runPromQueries(ctx, params) case v3.QueryTypeClickHouseSQL: - seriesList, err, errQueriesByName = q.runClickHouseQueries(ctx, params) + results, err, errQueriesByName = q.runClickHouseQueries(ctx, params) default: err = fmt.Errorf("invalid query type") } } - return seriesList, err, errQueriesByName + return results, err, errQueriesByName } func (q *querier) QueriesExecuted() []string { diff --git a/pkg/query-service/app/querier/querier_test.go b/pkg/query-service/app/querier/querier_test.go index 46855add7d..f4427a739a 100644 --- a/pkg/query-service/app/querier/querier_test.go +++ b/pkg/query-service/app/querier/querier_test.go @@ -406,9 +406,11 @@ func TestQueryRange(t *testing.T) { End: 1675115596722 + 120*60*1000, CompositeQuery: &v3.CompositeQuery{ QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, BuilderQueries: map[string]*v3.BuilderQuery{ "A": { QueryName: "A", + DataSource: v3.DataSourceMetrics, StepInterval: 60, AggregateAttribute: v3.AttributeKey{Key: "http_server_requests_seconds_count", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true}, Filters: &v3.FilterSet{ @@ -436,11 +438,13 @@ func TestQueryRange(t *testing.T) { End: 1675115596722 + 180*60*1000, CompositeQuery: &v3.CompositeQuery{ QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, BuilderQueries: map[string]*v3.BuilderQuery{ "A": { QueryName: "A", StepInterval: 60, AggregateAttribute: v3.AttributeKey{Key: "http_server_requests_seconds_count", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true}, + DataSource: v3.DataSourceMetrics, Filters: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{ @@ -461,6 +465,73 @@ func TestQueryRange(t *testing.T) { }, }, }, + // No caching for traces & logs yet + { + Start: 1675115596722, + End: 1675115596722 + 120*60*1000, + Step: 5 * time.Minute.Milliseconds(), + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "durationNano", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true}, + StepInterval: 60, + DataSource: v3.DataSourceTraces, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "method", IsColumn: false}, + Operator: "=", + Value: "GET", + }, + }, + }, + GroupBy: []v3.AttributeKey{ + {Key: "serviceName", IsColumn: false}, + {Key: "name", IsColumn: false}, + }, + AggregateOperator: v3.AggregateOperatorP95, + Expression: "A", + }, + }, + }, + }, + { + Start: 1675115596722 + 60*60*1000, + End: 1675115596722 + 180*60*1000, + Step: 5 * time.Minute.Milliseconds(), + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "durationNano", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true}, + StepInterval: 60, + DataSource: v3.DataSourceTraces, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "method", IsColumn: false}, + Operator: "=", + Value: "GET", + }, + }, + }, + GroupBy: []v3.AttributeKey{ + {Key: "serviceName", IsColumn: false}, + {Key: "name", IsColumn: false}, + }, + AggregateOperator: v3.AggregateOperatorP95, + Expression: "A", + }, + }, + }, + }, } cache := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute}) opts := QuerierOptions{ @@ -489,6 +560,117 @@ func TestQueryRange(t *testing.T) { expectedTimeRangeInQueryString := []string{ fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms <= %d", 1675115580000, 1675115580000+120*60*1000), fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms <= %d", 1675115580000+120*60*1000, 1675115580000+180*60*1000), + fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", 1675115580000*1000000, (1675115580000+120*60*1000)*int64(1000000)), + fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", (1675115580000+60*60*1000)*int64(1000000), (1675115580000+180*60*1000)*int64(1000000)), + } + + for i, param := range params { + _, err, errByName := q.QueryRange(context.Background(), param, nil) + if err != nil { + t.Errorf("expected no error, got %s", err) + } + if len(errByName) > 0 { + t.Errorf("expected no error, got %v", errByName) + } + + if !strings.Contains(q.QueriesExecuted()[i], expectedTimeRangeInQueryString[i]) { + t.Errorf("expected query to contain %s, got %s", expectedTimeRangeInQueryString[i], q.QueriesExecuted()[i]) + } + } +} + +func TestQueryRangeValueType(t *testing.T) { + // There shouldn't be any caching for value panel type + params := []*v3.QueryRangeParamsV3{ + { + Start: 1675115596722, + End: 1675115596722 + 120*60*1000, + Step: 5 * time.Minute.Milliseconds(), + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeValue, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{Key: "http_server_requests_seconds_count", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true}, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "method", IsColumn: false}, + Operator: "=", + Value: "GET", + }, + }, + }, + AggregateOperator: v3.AggregateOperatorSumRate, + Expression: "A", + ReduceTo: v3.ReduceToOperatorLast, + }, + }, + }, + }, + { + Start: 1675115596722 + 60*60*1000, + End: 1675115596722 + 180*60*1000, + Step: 5 * time.Minute.Milliseconds(), + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeValue, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceTraces, + AggregateAttribute: v3.AttributeKey{Key: "durationNano", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true}, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "method", IsColumn: false}, + Operator: "=", + Value: "GET", + }, + }, + }, + AggregateOperator: v3.AggregateOperatorP95, + Expression: "A", + ReduceTo: v3.ReduceToOperatorLast, + }, + }, + }, + }, + } + cache := inmemory.New(&inmemory.Options{TTL: 60 * time.Minute, CleanupInterval: 10 * time.Minute}) + opts := QuerierOptions{ + Cache: cache, + Reader: nil, + FluxInterval: 5 * time.Minute, + KeyGenerator: queryBuilder.NewKeyGenerator(), + + TestingMode: true, + ReturnedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "method": "GET", + "service_name": "test", + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596722, Value: 1}, + {Timestamp: 1675115596722 + 60*60*1000, Value: 2}, + {Timestamp: 1675115596722 + 120*60*1000, Value: 3}, + }, + }, + }, + } + q := NewQuerier(opts) + // No caching + expectedTimeRangeInQueryString := []string{ + fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms <= %d", 1675115580000, 1675115580000+120*60*1000), + fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", (1675115580000+60*60*1000)*int64(1000000), (1675115580000+180*60*1000)*int64(1000000)), } for i, param := range params { diff --git a/pkg/query-service/app/queryBuilder/query_builder.go b/pkg/query-service/app/queryBuilder/query_builder.go index 690ba6fcc2..328b519289 100644 --- a/pkg/query-service/app/queryBuilder/query_builder.go +++ b/pkg/query-service/app/queryBuilder/query_builder.go @@ -275,12 +275,35 @@ func expressionToKey(expression *govaluate.EvaluableExpression, keys map[string] return formula.ExpressionString() } +func isMetricExpression(expression *govaluate.EvaluableExpression, params *v3.QueryRangeParamsV3) bool { + variables := unique(expression.Vars()) + for _, variable := range variables { + if params.CompositeQuery.BuilderQueries[variable].DataSource != v3.DataSourceMetrics { + return false + } + } + return true +} + func (c *cacheKeyGenerator) GenerateKeys(params *v3.QueryRangeParamsV3) map[string]string { keys := make(map[string]string) + // For non-graph panels, we don't support caching + if params.CompositeQuery.PanelType != v3.PanelTypeGraph { + return keys + } + + // Use query as the cache key for PromQL queries + if params.CompositeQuery.QueryType == v3.QueryTypePromQL { + for name, query := range params.CompositeQuery.PromQueries { + keys[name] = query.Query + } + return keys + } + // Build keys for each builder query for queryName, query := range params.CompositeQuery.BuilderQueries { - if query.Expression == queryName { + if query.Expression == queryName && query.DataSource == v3.DataSourceMetrics { var parts []string // We need to build uniqe cache query for BuilderQuery @@ -321,6 +344,10 @@ func (c *cacheKeyGenerator) GenerateKeys(params *v3.QueryRangeParamsV3) map[stri if query.Expression != query.QueryName { expression, _ := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, EvalFuncs) + if !isMetricExpression(expression, params) { + continue + } + expressionCacheKey := expressionToKey(expression, keys) keys[query.QueryName] = expressionCacheKey } diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index d2a14e8ff8..69f3a9367f 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -22,11 +22,12 @@ import ( "go.signoz.io/signoz/pkg/query-service/app/clickhouseReader" "go.signoz.io/signoz/pkg/query-service/app/dashboards" "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" - opamp "go.signoz.io/signoz/pkg/query-service/app/opamp" + "go.signoz.io/signoz/pkg/query-service/app/opamp" opAmpModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model" "go.signoz.io/signoz/pkg/query-service/app/explorer" "go.signoz.io/signoz/pkg/query-service/auth" + "go.signoz.io/signoz/pkg/query-service/cache" "go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/dao" "go.signoz.io/signoz/pkg/query-service/featureManager" @@ -54,6 +55,8 @@ type ServerOptions struct { MaxIdleConns int MaxOpenConns int DialTimeout time.Duration + CacheConfigPath string + FluxInterval string } // Server runs HTTP, Mux and a grpc server @@ -134,6 +137,16 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { return nil, err } + var c cache.Cache + if serverOptions.CacheConfigPath != "" { + cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath) + if err != nil { + return nil, err + } + c = cache.NewCache(cacheOpts) + } + + fluxInterval, err := time.ParseDuration(serverOptions.FluxInterval) // ingestion pipelines manager logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(localDB, "sqlite") if err != nil { @@ -153,6 +166,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { RuleManager: rm, FeatureFlags: fm, LogsParsingPipelineController: logParsingPipelineController, + Cache: c, + FluxInterval: fluxInterval, }) if err != nil { return nil, err diff --git a/pkg/query-service/config/cache-config.yml b/pkg/query-service/config/cache-config.yml new file mode 100644 index 0000000000..b1bd329584 --- /dev/null +++ b/pkg/query-service/config/cache-config.yml @@ -0,0 +1,4 @@ +provider: "inmemory" +inmemory: + ttl: 60m + cleanupInterval: 10m diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 8b3697f89c..15cc4868b3 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -100,7 +100,7 @@ type Reader interface { } type Querier interface { - QueryRange(context.Context, *v3.QueryRangeParamsV3, map[string]v3.AttributeKey) ([]*v3.Series, error, map[string]string) + QueryRange(context.Context, *v3.QueryRangeParamsV3, map[string]v3.AttributeKey) ([]*v3.Result, error, map[string]string) // test helpers QueriesExecuted() []string diff --git a/pkg/query-service/main.go b/pkg/query-service/main.go index c0fdc3d5d2..76382b10c0 100644 --- a/pkg/query-service/main.go +++ b/pkg/query-service/main.go @@ -33,7 +33,7 @@ func main() { var disableRules bool // the url used to build link in the alert messages in slack and other systems - var ruleRepoURL string + var ruleRepoURL, cacheConfigPath, fluxInterval string var preferDelta bool var preferSpanMetrics bool @@ -51,6 +51,8 @@ func main() { flag.IntVar(&maxOpenConns, "max-open-conns", 100, "(max connections for use at any time.)") flag.DurationVar(&dialTimeout, "dial-timeout", 5*time.Second, "(the maximum time to establish a connection.)") flag.StringVar(&ruleRepoURL, "rules.repo-url", constants.AlertHelpPage, "(host address used to build rule link in alert messages)") + flag.StringVar(&cacheConfigPath, "experimental.cache-config", "", "(cache config to use)") + flag.StringVar(&fluxInterval, "flux-interval", "5m", "(cache config to use)") flag.Parse() loggerMgr := initZapLog() @@ -72,6 +74,8 @@ func main() { MaxIdleConns: maxIdleConns, MaxOpenConns: maxOpenConns, DialTimeout: dialTimeout, + CacheConfigPath: cacheConfigPath, + FluxInterval: fluxInterval, } // Read the jwt secret key diff --git a/pkg/query-service/model/db.go b/pkg/query-service/model/db.go index 9dcefd7ee0..f1d7817fc7 100644 --- a/pkg/query-service/model/db.go +++ b/pkg/query-service/model/db.go @@ -53,7 +53,6 @@ func (uf UserFlag) Value() (driver.Value, error) { } func (uf *UserFlag) Scan(value interface{}) error { - fmt.Println(" value:", value) if value == "" { return nil } diff --git a/pkg/query-service/model/queryParams.go b/pkg/query-service/model/queryParams.go index 429471616f..754de3eae0 100644 --- a/pkg/query-service/model/queryParams.go +++ b/pkg/query-service/model/queryParams.go @@ -136,6 +136,7 @@ type QueryRangeParamsV2 struct { Step int64 `json:"step"` CompositeMetricQuery *CompositeMetricQuery `json:"compositeMetricQuery"` Variables map[string]interface{} `json:"variables,omitempty"` + NoCache bool `json:"noCache"` } type DashboardVars struct { diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index 46828b66c1..3ca7126636 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "math" + "sort" "strconv" "time" @@ -494,6 +495,12 @@ type Series struct { Points []MetricPoint `json:"values"` } +func (s *Series) SortPoints() { + sort.Slice(s.Points, func(i, j int) bool { + return s.Points[i].Timestamp < s.Points[j].Timestamp + }) +} + type MetricPoint struct { Timestamp int64 Value float64 @@ -505,6 +512,17 @@ func (p *MetricPoint) MarshalJSON() ([]byte, error) { return json.Marshal([...]interface{}{float64(p.Timestamp) / 1000, v}) } +// UnmarshalJSON implements json.Unmarshaler. +func (p *MetricPoint) UnmarshalJSON(b []byte) error { + var a [2]interface{} + if err := json.Unmarshal(b, &a); err != nil { + return err + } + p.Timestamp = int64(a[0].(float64) * 1000) + p.Value, _ = strconv.ParseFloat(a[1].(string), 64) + return nil +} + type ShowCreateTableStatement struct { Statement string `json:"statement" ch:"statement"` }