diff --git a/Makefile b/Makefile index 1761acd26d..05b6cb27e6 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ LOCAL_GOOS ?= $(shell go env GOOS) LOCAL_GOARCH ?= $(shell go env GOARCH) REPONAME ?= signoz -DOCKER_TAG ?= latest +DOCKER_TAG ?= $(subst v,,$(BUILD_VERSION)) FRONTEND_DOCKER_IMAGE ?= frontend QUERY_SERVICE_DOCKER_IMAGE ?= query-service diff --git a/deploy/docker-swarm/clickhouse-setup/docker-compose.yaml b/deploy/docker-swarm/clickhouse-setup/docker-compose.yaml index 964411f6e6..026ef2d598 100644 --- a/deploy/docker-swarm/clickhouse-setup/docker-compose.yaml +++ b/deploy/docker-swarm/clickhouse-setup/docker-compose.yaml @@ -144,7 +144,7 @@ services: condition: on-failure query-service: - image: signoz/query-service:0.29.1 + image: signoz/query-service:0.29.2 command: [ "-config=/root/config/prometheus.yml", @@ -184,7 +184,7 @@ services: <<: *clickhouse-depend frontend: - image: signoz/frontend:0.29.1 + image: signoz/frontend:0.29.2 deploy: restart_policy: condition: on-failure diff --git a/deploy/docker/clickhouse-setup/docker-compose.yaml b/deploy/docker/clickhouse-setup/docker-compose.yaml index dbe8d4326d..a0f758faa8 100644 --- a/deploy/docker/clickhouse-setup/docker-compose.yaml +++ b/deploy/docker/clickhouse-setup/docker-compose.yaml @@ -162,7 +162,7 @@ services: # Notes for Maintainers/Contributors who will change Line Numbers of Frontend & Query-Section. Please Update Line Numbers in `./scripts/commentLinesForSetup.sh` & `./CONTRIBUTING.md` query-service: - image: signoz/query-service:${DOCKER_TAG:-0.29.1} + image: signoz/query-service:${DOCKER_TAG:-0.29.2} container_name: signoz-query-service command: [ @@ -201,7 +201,7 @@ services: <<: *clickhouse-depend frontend: - image: signoz/frontend:${DOCKER_TAG:-0.29.1} + image: signoz/frontend:${DOCKER_TAG:-0.29.2} container_name: signoz-frontend restart: on-failure depends_on: diff --git a/ee/query-service/app/api/api.go b/ee/query-service/app/api/api.go index 9239be2d99..5cdca8e204 100644 --- a/ee/query-service/app/api/api.go +++ b/ee/query-service/app/api/api.go @@ -10,6 +10,7 @@ import ( "go.signoz.io/signoz/ee/query-service/license" baseapp "go.signoz.io/signoz/pkg/query-service/app" "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" + "go.signoz.io/signoz/pkg/query-service/cache" baseint "go.signoz.io/signoz/pkg/query-service/interfaces" basemodel "go.signoz.io/signoz/pkg/query-service/model" rules "go.signoz.io/signoz/pkg/query-service/rules" @@ -29,6 +30,9 @@ type APIHandlerOptions struct { FeatureFlags baseint.FeatureLookup LicenseManager *license.Manager LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController + Cache cache.Cache + // Querier Influx Interval + FluxInterval time.Duration } type APIHandler struct { @@ -51,6 +55,8 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) { RuleManager: opts.RulesManager, FeatureFlags: opts.FeatureFlags, LogsParsingPipelineController: opts.LogsParsingPipelineController, + Cache: opts.Cache, + FluxInterval: opts.FluxInterval, }) if err != nil { diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index aee2c87160..e36d201ec3 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -35,6 +35,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/app/opamp" opAmpModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model" baseauth "go.signoz.io/signoz/pkg/query-service/auth" + "go.signoz.io/signoz/pkg/query-service/cache" baseconst "go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/healthcheck" basealm "go.signoz.io/signoz/pkg/query-service/integrations/alertManager" @@ -62,6 +63,8 @@ type ServerOptions struct { MaxIdleConns int MaxOpenConns int DialTimeout time.Duration + CacheConfigPath string + FluxInterval string } // Server runs HTTP api service @@ -189,6 +192,21 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { telemetry.GetInstance().SetReader(reader) + var c cache.Cache + if serverOptions.CacheConfigPath != "" { + cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath) + if err != nil { + return nil, err + } + c = cache.NewCache(cacheOpts) + } + + fluxInterval, err := time.ParseDuration(serverOptions.FluxInterval) + + if err != nil { + return nil, err + } + apiOpts := api.APIHandlerOptions{ DataConnector: reader, SkipConfig: skipConfig, @@ -202,6 +220,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { FeatureFlags: lm, LicenseManager: lm, LogsParsingPipelineController: logParsingPipelineController, + Cache: c, + FluxInterval: fluxInterval, } apiHandler, err := api.NewAPIHandler(apiOpts) diff --git a/ee/query-service/main.go b/ee/query-service/main.go index dcdedeb9db..d9b90340ae 100644 --- a/ee/query-service/main.go +++ b/ee/query-service/main.go @@ -82,6 +82,7 @@ func main() { // the url used to build link in the alert messages in slack and other systems var ruleRepoURL string + var cacheConfigPath, fluxInterval string var enableQueryServiceLogOTLPExport bool var preferDelta bool var preferSpanMetrics bool @@ -99,10 +100,14 @@ func main() { flag.IntVar(&maxOpenConns, "max-open-conns", 100, "(max connections for use at any time.)") flag.DurationVar(&dialTimeout, "dial-timeout", 5*time.Second, "(the maximum time to establish a connection.)") flag.StringVar(&ruleRepoURL, "rules.repo-url", baseconst.AlertHelpPage, "(host address used to build rule link in alert messages)") + flag.StringVar(&cacheConfigPath, "experimental.cache-config", "", "(cache config to use)") + flag.StringVar(&fluxInterval, "flux-interval", "5m", "(cache config to use)") flag.BoolVar(&enableQueryServiceLogOTLPExport, "enable.query.service.log.otlp.export", false, "(enable query service log otlp export)") + flag.Parse() loggerMgr := initZapLog(enableQueryServiceLogOTLPExport) + zap.ReplaceGlobals(loggerMgr) defer loggerMgr.Sync() // flushes buffer, if any @@ -121,6 +126,8 @@ func main() { MaxIdleConns: maxIdleConns, MaxOpenConns: maxOpenConns, DialTimeout: dialTimeout, + CacheConfigPath: cacheConfigPath, + FluxInterval: fluxInterval, } // Read the jwt secret key diff --git a/frontend/src/AppRoutes/index.tsx b/frontend/src/AppRoutes/index.tsx index 11f9d79d51..97b77917fa 100644 --- a/frontend/src/AppRoutes/index.tsx +++ b/frontend/src/AppRoutes/index.tsx @@ -10,7 +10,7 @@ import { NotificationProvider } from 'hooks/useNotifications'; import { ResourceProvider } from 'hooks/useResourceAttribute'; import history from 'lib/history'; import { QueryBuilderProvider } from 'providers/QueryBuilder'; -import { Suspense, useState } from 'react'; +import { Suspense, useEffect, useState } from 'react'; import { useDispatch, useSelector } from 'react-redux'; import { Route, Router, Switch } from 'react-router-dom'; import { Dispatch } from 'redux'; @@ -18,6 +18,7 @@ import { AppState } from 'store/reducers'; import AppActions from 'types/actions'; import { UPDATE_FEATURE_FLAG_RESPONSE } from 'types/actions/app'; import AppReducer from 'types/reducer/app'; +import { trackPageView } from 'utils/segmentAnalytics'; import PrivateRoute from './Private'; import defaultRoutes from './routes'; @@ -32,7 +33,7 @@ function App(): JSX.Element { const dispatch = useDispatch>(); - const { hostname } = window.location; + const { hostname, pathname } = window.location; const featureResponse = useGetFeatureFlag((allFlags) => { const isOnboardingEnabled = @@ -73,6 +74,20 @@ function App(): JSX.Element { } }); + useEffect(() => { + if (isLoggedInState && user && user.userId && user.email) { + window.analytics.identify(user?.userId, { + email: user?.email || '', + name: user?.name || '', + }); + } + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [isLoggedInState]); + + useEffect(() => { + trackPageView(pathname); + }, [pathname]); + return ( diff --git a/frontend/src/container/LocalTopNav/index.tsx b/frontend/src/container/LocalTopNav/index.tsx index 40010f5784..3de2f823ef 100644 --- a/frontend/src/container/LocalTopNav/index.tsx +++ b/frontend/src/container/LocalTopNav/index.tsx @@ -1,4 +1,5 @@ import { Col, Row, Space } from 'antd'; +import NewExplorerCTA from 'container/NewExplorerCTA'; import ShowBreadcrumbs from '../TopNav/Breadcrumbs'; import DateTimeSelector from '../TopNav/DateTimeSelection'; @@ -18,6 +19,7 @@ function LocalTopNav({ + {actions} {renderPermissions?.isDateTimeEnabled && (
diff --git a/frontend/src/container/LogDetailedView/JsonView.tsx b/frontend/src/container/LogDetailedView/JsonView.tsx index e510d46d10..c3ab659567 100644 --- a/frontend/src/container/LogDetailedView/JsonView.tsx +++ b/frontend/src/container/LogDetailedView/JsonView.tsx @@ -4,14 +4,17 @@ import { Button, Row } from 'antd'; import Editor from 'components/Editor'; import { useMemo } from 'react'; import { useCopyToClipboard } from 'react-use'; -import { ILog } from 'types/api/logs/log'; -interface JSONViewProps { - logData: ILog; -} +import { JSONViewProps } from './LogDetailedView.types'; +import { aggregateAttributesResourcesToString } from './utils'; + function JSONView({ logData }: JSONViewProps): JSX.Element { const [, copyToClipboard] = useCopyToClipboard(); - const LogJsonData = useMemo(() => JSON.stringify(logData, null, 2), [logData]); + + const LogJsonData = useMemo( + () => aggregateAttributesResourcesToString(logData), + [logData], + ); return (
=> { if (typeof value === 'string') { return recursiveParseJSON(value); } + if (typeof value === 'object') { + Object.entries(value).forEach(([key, val]) => { + if (typeof val === 'string') { + value[key] = val.trim(); + } else if (typeof val === 'object') { + value[key] = recursiveParseJSON(JSON.stringify(val)); + } + }); + } return value; } catch (e) { return {}; @@ -174,3 +184,35 @@ export const getFieldAttributes = (field: string): IFieldAttributes => { return { dataType, newField, logType }; }; + +export const aggregateAttributesResourcesToString = (logData: ILog): string => { + const outputJson: ILogAggregateAttributesResources = { + body: logData.body, + date: logData.date, + id: logData.id, + severityNumber: logData.severityNumber, + severityText: logData.severityText, + spanId: logData.spanId, + timestamp: logData.timestamp, + traceFlags: logData.traceFlags, + traceId: logData.traceId, + attributes: {}, + resources: {}, + }; + + Object.keys(logData).forEach((key) => { + if (key.startsWith('attributes_')) { + outputJson.attributes = outputJson.attributes || {}; + Object.assign(outputJson.attributes, logData[key as keyof ILog]); + } else if (key.startsWith('resources_')) { + outputJson.resources = outputJson.resources || {}; + Object.assign(outputJson.resources, logData[key as keyof ILog]); + } else { + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + outputJson[key] = logData[key as keyof ILog]; + } + }); + + return JSON.stringify(outputJson, null, 2); +}; diff --git a/frontend/src/container/NewExplorerCTA/config.ts b/frontend/src/container/NewExplorerCTA/config.ts index 783f576142..b2feeff572 100644 --- a/frontend/src/container/NewExplorerCTA/config.ts +++ b/frontend/src/container/NewExplorerCTA/config.ts @@ -1,3 +1,11 @@ +import ROUTES from 'constants/routes'; + export const RIBBON_STYLES = { top: '-0.75rem', }; + +export const buttonText = { + [ROUTES.LOGS_EXPLORER]: 'Switch to Old Logs Explorer', + [ROUTES.TRACE]: 'Try new Traces Explorer', + [ROUTES.LOGS]: 'Switch to New Logs Explorer', +}; diff --git a/frontend/src/container/NewExplorerCTA/index.tsx b/frontend/src/container/NewExplorerCTA/index.tsx index 93283b34a8..f93d2e636f 100644 --- a/frontend/src/container/NewExplorerCTA/index.tsx +++ b/frontend/src/container/NewExplorerCTA/index.tsx @@ -2,47 +2,57 @@ import { CompassOutlined } from '@ant-design/icons'; import { Badge, Button } from 'antd'; import ROUTES from 'constants/routes'; import history from 'lib/history'; -import { useMemo } from 'react'; +import { useCallback, useMemo } from 'react'; import { useLocation } from 'react-router-dom'; -import { RIBBON_STYLES } from './config'; +import { buttonText, RIBBON_STYLES } from './config'; function NewExplorerCTA(): JSX.Element | null { const location = useLocation(); const isTraceOrLogsExplorerPage = useMemo( - () => location.pathname === ROUTES.LOGS || location.pathname === ROUTES.TRACE, + () => + location.pathname === ROUTES.LOGS_EXPLORER || + location.pathname === ROUTES.TRACE || + location.pathname === ROUTES.LOGS, [location.pathname], ); - const onClickHandler = (): void => { - if (location.pathname === ROUTES.LOGS) { - history.push(ROUTES.LOGS_EXPLORER); + const onClickHandler = useCallback((): void => { + if (location.pathname === ROUTES.LOGS_EXPLORER) { + history.push(ROUTES.LOGS); } else if (location.pathname === ROUTES.TRACE) { history.push(ROUTES.TRACES_EXPLORER); + } else if (location.pathname === ROUTES.LOGS) { + history.push(ROUTES.LOGS_EXPLORER); } - }; + }, [location.pathname]); - const buttonText = useMemo( - () => - `Try new ${ROUTES.LOGS === location.pathname ? 'Logs' : 'Traces'} Explorer`, - [location.pathname], - ); - - if (!isTraceOrLogsExplorerPage) { - return null; - } - - return ( - + const button = useMemo( + () => ( + ), + [location.pathname, onClickHandler], + ); + + if (!isTraceOrLogsExplorerPage) { + return null; + } + + if (location.pathname === ROUTES.LOGS_EXPLORER) { + return button; + } + + return ( + + {button} ); } diff --git a/frontend/src/container/QueryBuilder/components/Query/Query.tsx b/frontend/src/container/QueryBuilder/components/Query/Query.tsx index 9f4fca9b12..8da2c14335 100644 --- a/frontend/src/container/QueryBuilder/components/Query/Query.tsx +++ b/frontend/src/container/QueryBuilder/components/Query/Query.tsx @@ -151,18 +151,16 @@ export const Query = memo(function Query({ case PANEL_TYPES.TIME_SERIES: { return ( <> - {!isMetricsDataSource && ( - - - - - - - - - - - )} + + + + + + + + + + @@ -173,16 +171,14 @@ export const Query = memo(function Query({ - {!isMetricsDataSource && ( - - - - - - {renderOrderByFilter()} - - - )} + + + + + + {renderOrderByFilter()} + + {renderAggregateEveryFilter()} @@ -251,7 +247,6 @@ export const Query = memo(function Query({ } }, [ panelType, - isMetricsDataSource, query, filterConfigs?.limit?.isHidden, filterConfigs?.having?.isHidden, diff --git a/frontend/src/container/SideNav/menuItems.tsx b/frontend/src/container/SideNav/menuItems.tsx index dd5c271fad..f1305f3d19 100644 --- a/frontend/src/container/SideNav/menuItems.tsx +++ b/frontend/src/container/SideNav/menuItems.tsx @@ -31,7 +31,7 @@ const menuItems: SidebarMenu[] = [ icon: , }, { - key: ROUTES.LOGS, + key: ROUTES.LOGS_EXPLORER, label: 'Logs', icon: , }, diff --git a/frontend/src/container/TraceDetail/SelectedSpanDetails/config.ts b/frontend/src/container/TraceDetail/SelectedSpanDetails/config.ts new file mode 100644 index 0000000000..46d0a5fc90 --- /dev/null +++ b/frontend/src/container/TraceDetail/SelectedSpanDetails/config.ts @@ -0,0 +1,71 @@ +import { initialAutocompleteData, OPERATORS } from 'constants/queryBuilder'; +import getStep from 'lib/getStep'; +import { + BaseAutocompleteData, + DataTypes, +} from 'types/api/queryBuilder/queryAutocompleteResponse'; +import { Query, TagFilter } from 'types/api/queryBuilder/queryBuilderData'; +import { EQueryType } from 'types/common/dashboard'; +import { DataSource, LogsAggregatorOperator } from 'types/common/queryBuilder'; +import { v4 as uuid } from 'uuid'; + +export const getTraceToLogsQuery = ( + traceId: string, + minTime: number, + maxTime: number, +): Query => { + const key: BaseAutocompleteData = { + id: uuid(), + dataType: DataTypes.String, + isColumn: true, + type: '', + isJSON: false, + key: 'trace_id', + }; + + const filters: TagFilter = { + items: [ + { + id: uuid(), + op: OPERATORS.IN, + value: traceId, + key, + }, + ], + op: 'AND', + }; + + const query: Query = { + id: uuid(), + queryType: EQueryType.QUERY_BUILDER, + clickhouse_sql: [], + promql: [], + builder: { + queryData: [ + { + filters, + dataSource: DataSource.LOGS, + disabled: false, + limit: null, + aggregateAttribute: initialAutocompleteData, + aggregateOperator: LogsAggregatorOperator.NOOP, + expression: 'A', + groupBy: [], + having: [], + legend: '', + orderBy: [], + queryName: 'A', + reduceTo: 'min', + stepInterval: getStep({ + start: minTime, + end: maxTime, + inputFormat: 'ns', + }), + }, + ], + queryFormulas: [], + }, + }; + + return query; +}; diff --git a/frontend/src/container/TraceDetail/SelectedSpanDetails/index.tsx b/frontend/src/container/TraceDetail/SelectedSpanDetails/index.tsx index db9fde5ecb..5afca32311 100644 --- a/frontend/src/container/TraceDetail/SelectedSpanDetails/index.tsx +++ b/frontend/src/container/TraceDetail/SelectedSpanDetails/index.tsx @@ -1,13 +1,19 @@ import { Button, Modal, Tabs, Tooltip, Typography } from 'antd'; import Editor from 'components/Editor'; import { StyledSpace } from 'components/Styled'; +import { QueryParams } from 'constants/query'; import ROUTES from 'constants/routes'; import { useIsDarkMode } from 'hooks/useDarkMode'; +import createQueryParams from 'lib/createQueryParams'; import history from 'lib/history'; import { useMemo, useState } from 'react'; +import { useSelector } from 'react-redux'; import { useParams } from 'react-router-dom'; +import { AppState } from 'store/reducers'; import { ITraceTree } from 'types/api/trace/getTraceItem'; +import { GlobalReducer } from 'types/reducer/globalTime'; +import { getTraceToLogsQuery } from './config'; import Events from './Events'; import { CardContainer, @@ -21,6 +27,10 @@ import Tags from './Tags'; function SelectedSpanDetails(props: SelectedSpanDetailsProps): JSX.Element { const { tree, firstSpanStartTime } = props; + const { maxTime, minTime } = useSelector( + (state) => state.globalTime, + ); + const { id: traceId } = useParams(); const isDarkMode = useIsDarkMode(); @@ -75,9 +85,15 @@ function SelectedSpanDetails(props: SelectedSpanDetailsProps): JSX.Element { ]; const onLogsHandler = (): void => { - const query = encodeURIComponent(`trace_id IN ('${traceId}')`); + const query = getTraceToLogsQuery(traceId, minTime, maxTime); - history.push(`${ROUTES.LOGS}?q=${query}`); + history.push( + `${ROUTES.LOGS_EXPLORER}?${createQueryParams({ + [QueryParams.compositeQuery]: JSON.stringify(query), + [QueryParams.startTime]: minTime, + [QueryParams.endTime]: maxTime, + })}`, + ); }; return ( diff --git a/frontend/src/hooks/queryBuilder/useQueryOperations.ts b/frontend/src/hooks/queryBuilder/useQueryOperations.ts index 0d2f8f0cf2..3de26f1f94 100644 --- a/frontend/src/hooks/queryBuilder/useQueryOperations.ts +++ b/frontend/src/hooks/queryBuilder/useQueryOperations.ts @@ -7,7 +7,7 @@ import { import { useQueryBuilder } from 'hooks/queryBuilder/useQueryBuilder'; import { getOperatorsBySourceAndPanelType } from 'lib/newQueryBuilder/getOperatorsBySourceAndPanelType'; import { findDataTypeOfOperator } from 'lib/query/findDataTypeOfOperator'; -import { useCallback, useEffect, useMemo, useState } from 'react'; +import { useCallback, useEffect, useState } from 'react'; import { BaseAutocompleteData } from 'types/api/queryBuilder/queryAutocompleteResponse'; import { IBuilderQuery } from 'types/api/queryBuilder/queryBuilderData'; import { @@ -148,13 +148,9 @@ export const useQueryOperations: UseQueryOperations = ({ [query, index, handleSetQueryData], ); - const isMetricsDataSource = useMemo( - () => query.dataSource === DataSource.METRICS, - [query.dataSource], - ); - const isTracePanelType = useMemo(() => panelType === PANEL_TYPES.TRACE, [ - panelType, - ]); + const isMetricsDataSource = query.dataSource === DataSource.METRICS; + + const isTracePanelType = panelType === PANEL_TYPES.TRACE; useEffect(() => { if (initialDataSource && dataSource !== initialDataSource) return; diff --git a/frontend/src/index.html.ejs b/frontend/src/index.html.ejs index 620d44d92e..8fe2ea80bb 100644 --- a/frontend/src/index.html.ejs +++ b/frontend/src/index.html.ejs @@ -99,5 +99,72 @@ } })(); + + diff --git a/frontend/src/types/api/logs/log.ts b/frontend/src/types/api/logs/log.ts index 77b090a0d7..0b519936a6 100644 --- a/frontend/src/types/api/logs/log.ts +++ b/frontend/src/types/api/logs/log.ts @@ -14,3 +14,20 @@ export interface ILog { attributesInt: Record; attributesFloat: Record; } + +type OmitAttributesResources = Pick< + ILog, + Exclude< + keyof ILog, + | 'resources_string' + | 'attributesString' + | 'attributes_string' + | 'attributesInt' + | 'attributesFloat' + > +>; + +export type ILogAggregateAttributesResources = OmitAttributesResources & { + attributes: Record; + resources: Record; +}; diff --git a/frontend/src/typings/window.ts b/frontend/src/typings/window.ts index b27f8a85e0..a05152e5a5 100644 --- a/frontend/src/typings/window.ts +++ b/frontend/src/typings/window.ts @@ -3,7 +3,7 @@ import { compose, Store } from 'redux'; declare global { interface Window { store: Store; - + analytics: Record; __REDUX_DEVTOOLS_EXTENSION_COMPOSE__: typeof compose; } } diff --git a/frontend/src/utils/segmentAnalytics.ts b/frontend/src/utils/segmentAnalytics.ts new file mode 100644 index 0000000000..d776c34c3c --- /dev/null +++ b/frontend/src/utils/segmentAnalytics.ts @@ -0,0 +1,12 @@ +function trackPageView(pageName: string): void { + window.analytics.page(pageName); +} + +function trackEvent( + eventName: string, + properties: Record, +): void { + window.analytics.track(eventName, properties); +} + +export { trackEvent, trackPageView }; diff --git a/frontend/webpack.config.js b/frontend/webpack.config.js index 770e40ca30..8690e5e55f 100644 --- a/frontend/webpack.config.js +++ b/frontend/webpack.config.js @@ -20,6 +20,7 @@ const plugins = [ new HtmlWebpackPlugin({ template: 'src/index.html.ejs', INTERCOM_APP_ID: process.env.INTERCOM_APP_ID, + SEGMENT_ID: process.env.SEGMENT_ID, }), new webpack.ProvidePlugin({ process: 'process/browser', @@ -29,6 +30,7 @@ const plugins = [ NODE_ENV: process.env.NODE_ENV, FRONTEND_API_ENDPOINT: process.env.FRONTEND_API_ENDPOINT, INTERCOM_APP_ID: process.env.INTERCOM_APP_ID, + SEGMENT_ID: process.env.SEGMENT_ID, }), }), ]; diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 3c85c07166..2d4b4c33ac 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -28,9 +28,11 @@ import ( "go.signoz.io/signoz/pkg/query-service/app/metrics" metricsv3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" "go.signoz.io/signoz/pkg/query-service/app/parser" + "go.signoz.io/signoz/pkg/query-service/app/querier" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" "go.signoz.io/signoz/pkg/query-service/auth" + "go.signoz.io/signoz/pkg/query-service/cache" "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" querytemplate "go.signoz.io/signoz/pkg/query-service/utils/queryTemplate" @@ -51,8 +53,9 @@ import ( type status string const ( - statusSuccess status = "success" - statusError status = "error" + statusSuccess status = "success" + statusError status = "error" + defaultFluxInterval = 5 * time.Minute ) // NewRouter creates and configures a Gorilla Router. @@ -73,6 +76,7 @@ type APIHandler struct { ruleManager *rules.Manager featureFlags interfaces.FeatureLookup ready func(http.HandlerFunc) http.HandlerFunc + querier interfaces.Querier queryBuilder *queryBuilder.QueryBuilder preferDelta bool preferSpanMetrics bool @@ -114,6 +118,11 @@ type APIHandlerOpts struct { // Log parsing pipelines LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController + // cache + Cache cache.Cache + + // Querier Influx Interval + FluxInterval time.Duration } // NewAPIHandler returns an APIHandler @@ -124,6 +133,16 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { return nil, err } + querierOpts := querier.QuerierOptions{ + Reader: opts.Reader, + Cache: opts.Cache, + KeyGenerator: queryBuilder.NewKeyGenerator(), + FluxInterval: opts.FluxInterval, + FeatureLookup: opts.FeatureFlags, + } + + querier := querier.NewQuerier(querierOpts) + aH := &APIHandler{ reader: opts.Reader, appDao: opts.AppDao, @@ -137,6 +156,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { ruleManager: opts.RuleManager, featureFlags: opts.FeatureFlags, LogsParsingPipelineController: opts.LogsParsingPipelineController, + querier: querier, } builderOpts := queryBuilder.QueryBuilderOptions{ @@ -2965,9 +2985,8 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que var result []*v3.Result var err error var errQuriesByName map[string]string - var queries map[string]string - switch queryRangeParams.CompositeQuery.QueryType { - case v3.QueryTypeBuilder: + var spanKeys map[string]v3.AttributeKey + if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder { // check if any enrichment is required for logs if yes then enrich them if logsv3.EnrichmentRequired(queryRangeParams) { // get the fields if any logs query is present @@ -2981,42 +3000,16 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que logsv3.Enrich(queryRangeParams, fields) } - var spanKeys map[string]v3.AttributeKey spanKeys, err = aH.getSpanKeysV3(ctx, queryRangeParams) if err != nil { apiErrObj := &model.ApiError{Typ: model.ErrorInternal, Err: err} RespondError(w, apiErrObj, errQuriesByName) return } - - queries, err = aH.queryBuilder.PrepareQueries(queryRangeParams, spanKeys) - if err != nil { - RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) - return - } - - if queryRangeParams.CompositeQuery.PanelType == v3.PanelTypeList || queryRangeParams.CompositeQuery.PanelType == v3.PanelTypeTrace { - result, err, errQuriesByName = aH.execClickHouseListQueries(r.Context(), queries) - } else { - result, err, errQuriesByName = aH.execClickHouseGraphQueries(r.Context(), queries) - } - case v3.QueryTypeClickHouseSQL: - queries := make(map[string]string) - for name, query := range queryRangeParams.CompositeQuery.ClickHouseQueries { - if query.Disabled { - continue - } - queries[name] = query.Query - } - result, err, errQuriesByName = aH.execClickHouseGraphQueries(r.Context(), queries) - case v3.QueryTypePromQL: - result, err, errQuriesByName = aH.execPromQueries(r.Context(), queryRangeParams) - default: - err = fmt.Errorf("invalid query type") - RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, errQuriesByName) - return } + result, err, errQuriesByName = aH.querier.QueryRange(ctx, queryRangeParams, spanKeys) + if err != nil { apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} RespondError(w, apiErrObj, errQuriesByName) @@ -3062,7 +3055,7 @@ func applyMetricLimit(results []*v3.Result, queryRangeParams *v3.QueryRangeParam limit := builderQueries[result.QueryName].Limit orderByList := builderQueries[result.QueryName].OrderBy - if limit != 0 { + if limit >= 0 { if len(orderByList) == 0 { // If no orderBy is specified, sort by value in descending order orderByList = []v3.OrderBy{{ColumnName: constants.SigNozOrderByValue, Order: "desc"}} @@ -3070,6 +3063,18 @@ func applyMetricLimit(results []*v3.Result, queryRangeParams *v3.QueryRangeParam sort.SliceStable(result.Series, func(i, j int) bool { for _, orderBy := range orderByList { if orderBy.ColumnName == constants.SigNozOrderByValue { + + // For table type queries (we rely on the fact that one value for row), sort + // based on final aggregation value + if len(result.Series[i].Points) == 1 && len(result.Series[j].Points) == 1 { + if orderBy.Order == "asc" { + return result.Series[i].Points[0].Value < result.Series[j].Points[0].Value + } else if orderBy.Order == "desc" { + return result.Series[i].Points[0].Value > result.Series[j].Points[0].Value + } + } + + // For graph type queries, sort based on GroupingSetsPoint if result.Series[i].GroupingSetsPoint == nil || result.Series[j].GroupingSetsPoint == nil { // Handle nil GroupingSetsPoint, if needed // Here, we assume non-nil values are always less than nil values @@ -3102,7 +3107,7 @@ func applyMetricLimit(results []*v3.Result, queryRangeParams *v3.QueryRangeParam return i < j }) - if len(result.Series) > int(limit) { + if limit > 0 && len(result.Series) > int(limit) { result.Series = result.Series[:limit] } } diff --git a/pkg/query-service/app/metrics/query_builder.go b/pkg/query-service/app/metrics/query_builder.go index 00f9fb3788..02e0022a0e 100644 --- a/pkg/query-service/app/metrics/query_builder.go +++ b/pkg/query-service/app/metrics/query_builder.go @@ -497,7 +497,7 @@ func PromFormattedValue(v interface{}) string { case float32, float64: return fmt.Sprintf("%f", x) case string: - return fmt.Sprintf("%s", x) + return x case bool: return fmt.Sprintf("%v", x) case []interface{}: @@ -506,7 +506,12 @@ func PromFormattedValue(v interface{}) string { } switch x[0].(type) { case string, int, float32, float64, bool: - return strings.Trim(strings.Join(strings.Fields(fmt.Sprint(x)), "|"), "[]") + // list of values joined by | for promql - a value can contain whitespace + var str []string + for _, sVal := range x { + str = append(str, fmt.Sprintf("%v", sVal)) + } + return strings.Join(str, "|") default: zap.L().Error("invalid type for prom formatted value", zap.Any("type", reflect.TypeOf(x[0]))) return "" diff --git a/pkg/query-service/app/metrics/v3/query_builder_test.go b/pkg/query-service/app/metrics/v3/query_builder_test.go index d315c52081..3f91641f30 100644 --- a/pkg/query-service/app/metrics/v3/query_builder_test.go +++ b/pkg/query-service/app/metrics/v3/query_builder_test.go @@ -82,6 +82,7 @@ func TestBuildQueryWithMultipleQueries(t *testing.T) { }, "B": { QueryName: "B", + StepInterval: 60, AggregateAttribute: v3.AttributeKey{Key: "name2"}, AggregateOperator: v3.AggregateOperatorRateMax, Expression: "B", diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index 5c44ba19f4..a957ef85e7 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -844,7 +844,6 @@ func parseFilterAttributeKeyRequest(r *http.Request) (*v3.FilterAttributeKeyRequ dataSource := v3.DataSource(r.URL.Query().Get("dataSource")) aggregateOperator := v3.AggregateOperator(r.URL.Query().Get("aggregateOperator")) aggregateAttribute := r.URL.Query().Get("aggregateAttribute") - limit, err := strconv.Atoi(r.URL.Query().Get("limit")) if err != nil { limit = 50 @@ -1045,5 +1044,29 @@ func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiE } } + // replace go template variables in prometheus query + if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypePromQL { + for _, promQuery := range queryRangeParams.CompositeQuery.PromQueries { + if promQuery.Disabled { + continue + } + tmpl := template.New("prometheus-query") + tmpl, err := tmpl.Parse(promQuery.Query) + if err != nil { + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err} + } + var query bytes.Buffer + + // replace go template variables + querytemplate.AssignReservedVarsV3(queryRangeParams) + + err = tmpl.Execute(&query, queryRangeParams.Variables) + if err != nil { + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err} + } + promQuery.Query = query.String() + } + } + return queryRangeParams, nil } diff --git a/pkg/query-service/app/parser_test.go b/pkg/query-service/app/parser_test.go index dd668aef2a..65ea226909 100644 --- a/pkg/query-service/app/parser_test.go +++ b/pkg/query-service/app/parser_test.go @@ -757,3 +757,108 @@ func TestParseQueryRangeParamsDashboardVarsSubstitution(t *testing.T) { }) } } + +func TestParseQueryRangeParamsPromQLVars(t *testing.T) { + reqCases := []struct { + desc string + compositeQuery v3.CompositeQuery + variables map[string]interface{} + expectErr bool + errMsg string + expectedQuery string + }{ + { + desc: "valid prom query with dashboard variables", + compositeQuery: v3.CompositeQuery{ + PanelType: v3.PanelTypeGraph, + QueryType: v3.QueryTypePromQL, + PromQueries: map[string]*v3.PromQuery{ + "A": { + Query: "http_calls_total{service_name=\"{{.service_name}}\", operation_name=~\"{{.operation_name}}\"}", + Disabled: false, + }, + }, + }, + variables: map[string]interface{}{ + "service_name": "route", + "operation_name": []interface{}{ + "GET /route", + "POST /route", + }, + }, + expectErr: false, + expectedQuery: "http_calls_total{service_name=\"route\", operation_name=~\"GET /route|POST /route\"}", + }, + { + desc: "valid prom query with dashboard variables", + compositeQuery: v3.CompositeQuery{ + PanelType: v3.PanelTypeGraph, + QueryType: v3.QueryTypePromQL, + PromQueries: map[string]*v3.PromQuery{ + "A": { + Query: "http_calls_total{service_name=\"{{.service_name}}\", status_code=~\"{{.status_code}}\"}", + Disabled: false, + }, + }, + }, + variables: map[string]interface{}{ + "service_name": "route", + "status_code": []interface{}{ + 200, + 505, + }, + }, + expectErr: false, + expectedQuery: "http_calls_total{service_name=\"route\", status_code=~\"200|505\"}", + }, + { + desc: "valid prom query with dashboard variables", + compositeQuery: v3.CompositeQuery{ + PanelType: v3.PanelTypeGraph, + QueryType: v3.QueryTypePromQL, + PromQueries: map[string]*v3.PromQuery{ + "A": { + Query: "http_calls_total{service_name=\"{{.service_name}}\", quantity=~\"{{.quantity}}\"}", + Disabled: false, + }, + }, + }, + variables: map[string]interface{}{ + "service_name": "route", + "quantity": []interface{}{ + 4.5, + 4.6, + }, + }, + expectErr: false, + expectedQuery: "http_calls_total{service_name=\"route\", quantity=~\"4.5|4.6\"}", + }, + } + + for _, tc := range reqCases { + t.Run(tc.desc, func(t *testing.T) { + + queryRangeParams := &v3.QueryRangeParamsV3{ + Start: time.Now().Add(-time.Hour).UnixMilli(), + End: time.Now().UnixMilli(), + Step: time.Minute.Microseconds(), + CompositeQuery: &tc.compositeQuery, + Variables: tc.variables, + } + + body := &bytes.Buffer{} + err := json.NewEncoder(body).Encode(queryRangeParams) + require.NoError(t, err) + req := httptest.NewRequest(http.MethodPost, "/api/v3/query_range", body) + + parsedQueryRangeParams, apiErr := ParseQueryRangeParams(req) + if tc.expectErr { + require.Error(t, apiErr) + require.Contains(t, apiErr.Error(), tc.errMsg) + } else { + require.Nil(t, apiErr) + require.Equal(t, parsedQueryRangeParams.CompositeQuery.PromQueries["A"].Query, tc.expectedQuery) + } + }) + } +} diff --git a/pkg/query-service/app/querier/helper.go b/pkg/query-service/app/querier/helper.go new file mode 100644 index 0000000000..4d809a18f3 --- /dev/null +++ b/pkg/query-service/app/querier/helper.go @@ -0,0 +1,304 @@ +package querier + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "time" + + logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" + metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" + tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" + "go.signoz.io/signoz/pkg/query-service/cache/status" + "go.signoz.io/signoz/pkg/query-service/constants" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.uber.org/zap" +) + +func (q *querier) runBuilderQuery( + ctx context.Context, + builderQuery *v3.BuilderQuery, + params *v3.QueryRangeParamsV3, + keys map[string]v3.AttributeKey, + cacheKeys map[string]string, + ch chan channelResult, + wg *sync.WaitGroup, +) { + defer wg.Done() + queryName := builderQuery.QueryName + + var preferRPM bool + + if q.featureLookUp != nil { + preferRPM = q.featureLookUp.CheckFeature(constants.PreferRPM) == nil + } + + // TODO: handle other data sources + if builderQuery.DataSource == v3.DataSourceLogs { + var query string + var err error + // for ts query with limit replace it as it is already formed + if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 { + limitQuery, err := logsV3.PrepareLogsQuery( + params.Start, + params.End, + params.CompositeQuery.QueryType, + params.CompositeQuery.PanelType, + builderQuery, + logsV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: preferRPM}, + ) + if err != nil { + ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil} + return + } + placeholderQuery, err := logsV3.PrepareLogsQuery( + params.Start, + params.End, + params.CompositeQuery.QueryType, + params.CompositeQuery.PanelType, + builderQuery, + logsV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: preferRPM}, + ) + if err != nil { + ch <- channelResult{Err: err, Name: queryName, Query: placeholderQuery, Series: nil} + return + } + query = fmt.Sprintf(placeholderQuery, limitQuery) + } else { + query, err = logsV3.PrepareLogsQuery( + params.Start, + params.End, + params.CompositeQuery.QueryType, + params.CompositeQuery.PanelType, + builderQuery, + logsV3.Options{PreferRPM: preferRPM}, + ) + if err != nil { + ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} + return + } + } + + if err != nil { + ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} + return + } + series, err := q.execClickHouseQuery(ctx, query) + ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series} + return + } + + if builderQuery.DataSource == v3.DataSourceTraces { + + var query string + var err error + // for ts query with group by and limit form two queries + if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 { + limitQuery, err := tracesV3.PrepareTracesQuery( + params.Start, + params.End, + params.CompositeQuery.PanelType, + builderQuery, + keys, + tracesV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: preferRPM}, + ) + if err != nil { + ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil} + return + } + placeholderQuery, err := tracesV3.PrepareTracesQuery( + params.Start, + params.End, + params.CompositeQuery.PanelType, + builderQuery, + keys, + tracesV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: preferRPM}, + ) + if err != nil { + ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil} + return + } + query = fmt.Sprintf(placeholderQuery, limitQuery) + } else { + query, err = tracesV3.PrepareTracesQuery( + params.Start, + params.End, + params.CompositeQuery.PanelType, + builderQuery, + keys, + tracesV3.Options{PreferRPM: preferRPM}, + ) + if err != nil { + ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} + return + } + } + + series, err := q.execClickHouseQuery(ctx, query) + ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series} + return + } + + // What is happening here? + // We are only caching the graph panel queries. A non-existant cache key means that the query is not cached. + // If the query is not cached, we execute the query and return the result without caching it. + if _, ok := cacheKeys[queryName]; !ok { + query, err := metricsV3.PrepareMetricQuery(params.Start, params.End, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, metricsV3.Options{PreferRPM: preferRPM}) + if err != nil { + ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} + return + } + series, err := q.execClickHouseQuery(ctx, query) + ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series} + return + } + + cacheKey := cacheKeys[queryName] + var cachedData []byte + if !params.NoCache && q.cache != nil { + var retrieveStatus status.RetrieveStatus + data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true) + zap.S().Debug("cache retrieve status", zap.String("status", retrieveStatus.String())) + if err == nil { + cachedData = data + } + } + misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData) + missedSeries := make([]*v3.Series, 0) + cachedSeries := make([]*v3.Series, 0) + for _, miss := range misses { + query, err := metricsV3.PrepareMetricQuery( + miss.start, + miss.end, + params.CompositeQuery.QueryType, + params.CompositeQuery.PanelType, + builderQuery, + metricsV3.Options{}, + ) + if err != nil { + ch <- channelResult{ + Err: err, + Name: queryName, + Query: query, + Series: nil, + } + return + } + series, err := q.execClickHouseQuery(ctx, query) + if err != nil { + ch <- channelResult{ + Err: err, + Name: queryName, + Query: query, + Series: nil, + } + return + } + missedSeries = append(missedSeries, series...) + } + if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { + zap.S().Error("error unmarshalling cached data", zap.Error(err)) + } + mergedSeries := mergeSerieses(cachedSeries, missedSeries) + + ch <- channelResult{ + Err: nil, + Name: queryName, + Series: mergedSeries, + } + // Cache the seriesList for future queries + if len(missedSeries) > 0 && !params.NoCache && q.cache != nil { + mergedSeriesData, err := json.Marshal(mergedSeries) + if err != nil { + zap.S().Error("error marshalling merged series", zap.Error(err)) + return + } + err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour) + if err != nil { + zap.S().Error("error storing merged series", zap.Error(err)) + return + } + } +} + +func (q *querier) runBuilderExpression( + ctx context.Context, + builderQuery *v3.BuilderQuery, + params *v3.QueryRangeParamsV3, + keys map[string]v3.AttributeKey, + cacheKeys map[string]string, + ch chan channelResult, + wg *sync.WaitGroup, +) { + defer wg.Done() + + queryName := builderQuery.QueryName + + queries, err := q.builder.PrepareQueries(params, keys) + if err != nil { + ch <- channelResult{Err: err, Name: queryName, Query: "", Series: nil} + return + } + + if _, ok := cacheKeys[queryName]; !ok { + query := queries[queryName] + series, err := q.execClickHouseQuery(ctx, query) + ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series} + return + } + + cacheKey := cacheKeys[queryName] + var cachedData []byte + if !params.NoCache && q.cache != nil { + var retrieveStatus status.RetrieveStatus + data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true) + zap.S().Debug("cache retrieve status", zap.String("status", retrieveStatus.String())) + if err == nil { + cachedData = data + } + } + misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData) + missedSeries := make([]*v3.Series, 0) + cachedSeries := make([]*v3.Series, 0) + for _, miss := range misses { + missQueries, _ := q.builder.PrepareQueries(&v3.QueryRangeParamsV3{ + Start: miss.start, + End: miss.end, + Step: params.Step, + NoCache: params.NoCache, + CompositeQuery: params.CompositeQuery, + Variables: params.Variables, + }, keys) + query := missQueries[queryName] + series, err := q.execClickHouseQuery(ctx, query) + if err != nil { + ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil} + return + } + missedSeries = append(missedSeries, series...) + } + if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { + zap.S().Error("error unmarshalling cached data", zap.Error(err)) + } + mergedSeries := mergeSerieses(cachedSeries, missedSeries) + + ch <- channelResult{ + Err: nil, + Name: queryName, + Series: mergedSeries, + } + // Cache the seriesList for future queries + if len(missedSeries) > 0 && !params.NoCache && q.cache != nil { + mergedSeriesData, err := json.Marshal(mergedSeries) + if err != nil { + zap.S().Error("error marshalling merged series", zap.Error(err)) + return + } + err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour) + if err != nil { + zap.S().Error("error storing merged series", zap.Error(err)) + return + } + } +} diff --git a/pkg/query-service/app/querier/querier.go b/pkg/query-service/app/querier/querier.go index e0cea90126..f5c9e07113 100644 --- a/pkg/query-service/app/querier/querier.go +++ b/pkg/query-service/app/querier/querier.go @@ -7,20 +7,30 @@ import ( "math" "sort" "strings" + "sync" "time" logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" + "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" "go.signoz.io/signoz/pkg/query-service/cache" - "go.signoz.io/signoz/pkg/query-service/cache/status" "go.signoz.io/signoz/pkg/query-service/interfaces" "go.signoz.io/signoz/pkg/query-service/model" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.uber.org/multierr" "go.uber.org/zap" ) +type channelResult struct { + Series []*v3.Series + List []*v3.Row + Err error + Name string + Query string +} + type missInterval struct { start, end int64 // in milliseconds } @@ -32,6 +42,9 @@ type querier struct { fluxInterval time.Duration + builder *queryBuilder.QueryBuilder + featureLookUp interfaces.FeatureLookup + // used for testing // TODO(srikanthccv): remove this once we have a proper mock testingMode bool @@ -41,10 +54,11 @@ type querier struct { } type QuerierOptions struct { - Reader interfaces.Reader - Cache cache.Cache - KeyGenerator cache.KeyGenerator - FluxInterval time.Duration + Reader interfaces.Reader + Cache cache.Cache + KeyGenerator cache.KeyGenerator + FluxInterval time.Duration + FeatureLookup interfaces.FeatureLookup // used for testing TestingMode bool @@ -59,6 +73,13 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier { keyGenerator: opts.KeyGenerator, fluxInterval: opts.FluxInterval, + builder: queryBuilder.NewQueryBuilder(queryBuilder.QueryBuilderOptions{ + BuildTraceQuery: tracesV3.PrepareTracesQuery, + BuildLogQuery: logsV3.PrepareLogsQuery, + BuildMetricQuery: metricsV3.PrepareMetricQuery, + }, opts.FeatureLookup), + featureLookUp: opts.FeatureLookup, + testingMode: opts.TestingMode, returnedSeries: opts.ReturnedSeries, returnedErr: opts.ReturnedErr, @@ -160,7 +181,7 @@ func findMissingTimeRanges(start, end int64, seriesList []*v3.Series, fluxInterv var validMisses []missInterval for idx := range misses { miss := misses[idx] - if miss.start <= miss.end { + if miss.start < miss.end { validMisses = append(validMisses, miss) } } @@ -223,206 +244,246 @@ func mergeSerieses(cachedSeries, missedSeries []*v3.Series) []*v3.Series { return mergedSeries } -func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Series, error, map[string]string) { +func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, error, map[string]string) { cacheKeys := q.keyGenerator.GenerateKeys(params) - seriesList := make([]*v3.Series, 0) - errQueriesByName := make(map[string]string) - var err error + ch := make(chan channelResult, len(params.CompositeQuery.BuilderQueries)) + var wg sync.WaitGroup for queryName, builderQuery := range params.CompositeQuery.BuilderQueries { - - // TODO: add support for logs and traces - if builderQuery.DataSource == v3.DataSourceLogs { - query, err := logsV3.PrepareLogsQuery(params.Start, params.End, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, logsV3.Options{}) - if err != nil { - errQueriesByName[queryName] = err.Error() - continue - } - series, err := q.execClickHouseQuery(ctx, query) - if err != nil { - errQueriesByName[queryName] = err.Error() - continue - } - seriesList = append(seriesList, series...) + if builderQuery.Disabled { continue } - - if builderQuery.DataSource == v3.DataSourceTraces { - query, err := tracesV3.PrepareTracesQuery(params.Start, params.End, params.CompositeQuery.PanelType, builderQuery, keys, tracesV3.Options{ - GraphLimitQtype: "", - PreferRPM: false, - }) - if err != nil { - errQueriesByName[queryName] = err.Error() - continue - } - - series, err := q.execClickHouseQuery(ctx, query) - if err != nil { - errQueriesByName[queryName] = err.Error() - continue - } - seriesList = append(seriesList, series...) - continue - } - - cacheKey := cacheKeys[queryName] - var cachedData []byte - if !params.NoCache { - var retrieveStatus status.RetrieveStatus - cachedData, retrieveStatus, err = q.cache.Retrieve(cacheKey, true) - zap.L().Debug("cache retrieve status", zap.String("status", retrieveStatus.String())) - if err != nil { - return nil, err, nil - } - } - misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData) - missedSeries := make([]*v3.Series, 0) - cachedSeries := make([]*v3.Series, 0) - for _, miss := range misses { - query, err := metricsV3.PrepareMetricQuery( - miss.start, - miss.end, - params.CompositeQuery.QueryType, - params.CompositeQuery.PanelType, - builderQuery, - metricsV3.Options{PreferRPM: false}, - ) - if err != nil { - errQueriesByName[queryName] = err.Error() - continue - } - series, err := q.execClickHouseQuery(ctx, query) - if err != nil { - errQueriesByName[queryName] = err.Error() - continue - } - missedSeries = append(missedSeries, series...) - } - if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { - errQueriesByName[queryName] = err.Error() - continue - } - mergedSeries := mergeSerieses(cachedSeries, missedSeries) - - seriesList = append(seriesList, mergedSeries...) - // Cache the seriesList for future queries - if len(missedSeries) > 0 { - mergedSeriesData, err := json.Marshal(mergedSeries) - if err != nil { - errQueriesByName[queryName] = err.Error() - continue - } - err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour) - if err != nil { - errQueriesByName[queryName] = err.Error() - continue - } + wg.Add(1) + if queryName == builderQuery.Expression { + go q.runBuilderQuery(ctx, builderQuery, params, keys, cacheKeys, ch, &wg) + } else { + go q.runBuilderExpression(ctx, builderQuery, params, keys, cacheKeys, ch, &wg) } } - if len(errQueriesByName) > 0 { + + wg.Wait() + close(ch) + + results := make([]*v3.Result, 0) + errQueriesByName := make(map[string]string) + var errs []error + + for result := range ch { + if result.Err != nil { + errs = append(errs, result.Err) + errQueriesByName[result.Name] = result.Err.Error() + continue + } + results = append(results, &v3.Result{ + QueryName: result.Name, + Series: result.Series, + }) + } + + var err error + if len(errs) > 0 { err = fmt.Errorf("error in builder queries") } - return seriesList, err, errQueriesByName + + return results, err, errQueriesByName } -func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Series, error, map[string]string) { - seriesList := make([]*v3.Series, 0) - errQueriesByName := make(map[string]string) - var err error - for queryName, promQuery := range params.CompositeQuery.PromQueries { - cacheKey := q.keyGenerator.GenerateKeys(params)[queryName] - var cachedData []byte - var retrieveStatus status.RetrieveStatus - if !params.NoCache { - cachedData, retrieveStatus, err = q.cache.Retrieve(cacheKey, true) - zap.L().Debug("cache retrieve status", zap.String("status", retrieveStatus.String())) - } - if err != nil { - errQueriesByName[queryName] = err.Error() - continue - } - misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData) - missedSeries := make([]*v3.Series, 0) - cachedSeries := make([]*v3.Series, 0) - for _, miss := range misses { - query := metricsV3.BuildPromQuery( - promQuery, - params.Step, - miss.start, - miss.end, - ) - series, err := q.execPromQuery(ctx, query) - if err != nil { - errQueriesByName[queryName] = err.Error() - continue - } - missedSeries = append(missedSeries, series...) - } - if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { - errQueriesByName[queryName] = err.Error() - continue - } - mergedSeries := mergeSerieses(cachedSeries, missedSeries) +func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, error, map[string]string) { + channelResults := make(chan channelResult, len(params.CompositeQuery.PromQueries)) + var wg sync.WaitGroup + cacheKeys := q.keyGenerator.GenerateKeys(params) - seriesList = append(seriesList, mergedSeries...) - // Cache the seriesList for future queries - if len(missedSeries) > 0 { - mergedSeriesData, err := json.Marshal(mergedSeries) - if err != nil { - errQueriesByName[queryName] = err.Error() - continue - } - err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour) - if err != nil { - errQueriesByName[queryName] = err.Error() - continue - } + for queryName, promQuery := range params.CompositeQuery.PromQueries { + if promQuery.Disabled { + continue } + wg.Add(1) + go func(queryName string, promQuery *v3.PromQuery) { + defer wg.Done() + cacheKey := cacheKeys[queryName] + var cachedData []byte + // Ensure NoCache is not set and cache is not nil + if !params.NoCache && q.cache != nil { + data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true) + zap.S().Debug("cache retrieve status", zap.String("status", retrieveStatus.String())) + if err == nil { + cachedData = data + } + } + misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData) + missedSeries := make([]*v3.Series, 0) + cachedSeries := make([]*v3.Series, 0) + for _, miss := range misses { + query := metricsV3.BuildPromQuery(promQuery, params.Step, miss.start, miss.end) + series, err := q.execPromQuery(ctx, query) + if err != nil { + channelResults <- channelResult{Err: err, Name: queryName, Query: query.Query, Series: nil} + return + } + missedSeries = append(missedSeries, series...) + } + if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil { + // ideally we should not be getting an error here + zap.S().Error("error unmarshalling cached data", zap.Error(err)) + } + mergedSeries := mergeSerieses(cachedSeries, missedSeries) + + channelResults <- channelResult{Err: nil, Name: queryName, Query: promQuery.Query, Series: mergedSeries} + + // Cache the seriesList for future queries + if len(missedSeries) > 0 && !params.NoCache && q.cache != nil { + mergedSeriesData, err := json.Marshal(mergedSeries) + if err != nil { + zap.S().Error("error marshalling merged series", zap.Error(err)) + return + } + err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour) + if err != nil { + zap.S().Error("error storing merged series", zap.Error(err)) + return + } + } + }(queryName, promQuery) } - if len(errQueriesByName) > 0 { + wg.Wait() + close(channelResults) + + results := make([]*v3.Result, 0) + errQueriesByName := make(map[string]string) + var errs []error + + for result := range channelResults { + if result.Err != nil { + errs = append(errs, result.Err) + errQueriesByName[result.Name] = result.Err.Error() + continue + } + results = append(results, &v3.Result{ + QueryName: result.Name, + Series: result.Series, + }) + } + + var err error + if len(errs) > 0 { err = fmt.Errorf("error in prom queries") } - return seriesList, err, errQueriesByName + + return results, err, errQueriesByName } -func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Series, error, map[string]string) { - seriesList := make([]*v3.Series, 0) - errQueriesByName := make(map[string]string) - var err error +func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, error, map[string]string) { + channelResults := make(chan channelResult, len(params.CompositeQuery.ClickHouseQueries)) + var wg sync.WaitGroup for queryName, clickHouseQuery := range params.CompositeQuery.ClickHouseQueries { - series, err := q.execClickHouseQuery(ctx, clickHouseQuery.Query) - if err != nil { - errQueriesByName[queryName] = err.Error() + if clickHouseQuery.Disabled { continue } - seriesList = append(seriesList, series...) + wg.Add(1) + go func(queryName string, clickHouseQuery *v3.ClickHouseQuery) { + defer wg.Done() + series, err := q.execClickHouseQuery(ctx, clickHouseQuery.Query) + channelResults <- channelResult{Err: err, Name: queryName, Query: clickHouseQuery.Query, Series: series} + }(queryName, clickHouseQuery) } - if len(errQueriesByName) > 0 { + wg.Wait() + close(channelResults) + + results := make([]*v3.Result, 0) + errQueriesByName := make(map[string]string) + var errs []error + + for result := range channelResults { + if result.Err != nil { + errs = append(errs, result.Err) + errQueriesByName[result.Name] = result.Err.Error() + continue + } + results = append(results, &v3.Result{ + QueryName: result.Name, + Series: result.Series, + }) + } + + var err error + if len(errs) > 0 { err = fmt.Errorf("error in clickhouse queries") } - return seriesList, err, errQueriesByName + return results, err, errQueriesByName } -func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Series, error, map[string]string) { - var seriesList []*v3.Series +func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, error, map[string]string) { + + queries, err := q.builder.PrepareQueries(params, keys) + + if err != nil { + return nil, err, nil + } + + ch := make(chan channelResult, len(queries)) + var wg sync.WaitGroup + + for name, query := range queries { + wg.Add(1) + go func(name, query string) { + defer wg.Done() + rowList, err := q.reader.GetListResultV3(ctx, query) + + if err != nil { + ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err), Name: name, Query: query} + return + } + ch <- channelResult{List: rowList, Name: name, Query: query} + }(name, query) + } + + wg.Wait() + close(ch) + + var errs []error + errQuriesByName := make(map[string]string) + res := make([]*v3.Result, 0) + // read values from the channel + for r := range ch { + if r.Err != nil { + errs = append(errs, r.Err) + errQuriesByName[r.Name] = r.Query + continue + } + res = append(res, &v3.Result{ + QueryName: r.Name, + List: r.List, + }) + } + if len(errs) != 0 { + return nil, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)), errQuriesByName + } + return res, nil, nil +} + +func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, error, map[string]string) { + var results []*v3.Result var err error var errQueriesByName map[string]string if params.CompositeQuery != nil { switch params.CompositeQuery.QueryType { case v3.QueryTypeBuilder: - seriesList, err, errQueriesByName = q.runBuilderQueries(ctx, params, keys) + if params.CompositeQuery.PanelType == v3.PanelTypeList || params.CompositeQuery.PanelType == v3.PanelTypeTrace { + results, err, errQueriesByName = q.runBuilderListQueries(ctx, params, keys) + } else { + results, err, errQueriesByName = q.runBuilderQueries(ctx, params, keys) + } case v3.QueryTypePromQL: - seriesList, err, errQueriesByName = q.runPromQueries(ctx, params) + results, err, errQueriesByName = q.runPromQueries(ctx, params) case v3.QueryTypeClickHouseSQL: - seriesList, err, errQueriesByName = q.runClickHouseQueries(ctx, params) + results, err, errQueriesByName = q.runClickHouseQueries(ctx, params) default: err = fmt.Errorf("invalid query type") } } - return seriesList, err, errQueriesByName + return results, err, errQueriesByName } func (q *querier) QueriesExecuted() []string { diff --git a/pkg/query-service/app/querier/querier_test.go b/pkg/query-service/app/querier/querier_test.go index 46855add7d..f4427a739a 100644 --- a/pkg/query-service/app/querier/querier_test.go +++ b/pkg/query-service/app/querier/querier_test.go @@ -406,9 +406,11 @@ func TestQueryRange(t *testing.T) { End: 1675115596722 + 120*60*1000, CompositeQuery: &v3.CompositeQuery{ QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, BuilderQueries: map[string]*v3.BuilderQuery{ "A": { QueryName: "A", + DataSource: v3.DataSourceMetrics, StepInterval: 60, AggregateAttribute: v3.AttributeKey{Key: "http_server_requests_seconds_count", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true}, Filters: &v3.FilterSet{ @@ -436,11 +438,13 @@ func TestQueryRange(t *testing.T) { End: 1675115596722 + 180*60*1000, CompositeQuery: &v3.CompositeQuery{ QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, BuilderQueries: map[string]*v3.BuilderQuery{ "A": { QueryName: "A", StepInterval: 60, AggregateAttribute: v3.AttributeKey{Key: "http_server_requests_seconds_count", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true}, + DataSource: v3.DataSourceMetrics, Filters: &v3.FilterSet{ Operator: "AND", Items: []v3.FilterItem{ @@ -461,6 +465,73 @@ func TestQueryRange(t *testing.T) { }, }, }, + // No caching for traces & logs yet + { + Start: 1675115596722, + End: 1675115596722 + 120*60*1000, + Step: 5 * time.Minute.Milliseconds(), + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "durationNano", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true}, + StepInterval: 60, + DataSource: v3.DataSourceTraces, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "method", IsColumn: false}, + Operator: "=", + Value: "GET", + }, + }, + }, + GroupBy: []v3.AttributeKey{ + {Key: "serviceName", IsColumn: false}, + {Key: "name", IsColumn: false}, + }, + AggregateOperator: v3.AggregateOperatorP95, + Expression: "A", + }, + }, + }, + }, + { + Start: 1675115596722 + 60*60*1000, + End: 1675115596722 + 180*60*1000, + Step: 5 * time.Minute.Milliseconds(), + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeGraph, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + AggregateAttribute: v3.AttributeKey{Key: "durationNano", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true}, + StepInterval: 60, + DataSource: v3.DataSourceTraces, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "method", IsColumn: false}, + Operator: "=", + Value: "GET", + }, + }, + }, + GroupBy: []v3.AttributeKey{ + {Key: "serviceName", IsColumn: false}, + {Key: "name", IsColumn: false}, + }, + AggregateOperator: v3.AggregateOperatorP95, + Expression: "A", + }, + }, + }, + }, } cache := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute}) opts := QuerierOptions{ @@ -489,6 +560,117 @@ func TestQueryRange(t *testing.T) { expectedTimeRangeInQueryString := []string{ fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms <= %d", 1675115580000, 1675115580000+120*60*1000), fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms <= %d", 1675115580000+120*60*1000, 1675115580000+180*60*1000), + fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", 1675115580000*1000000, (1675115580000+120*60*1000)*int64(1000000)), + fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", (1675115580000+60*60*1000)*int64(1000000), (1675115580000+180*60*1000)*int64(1000000)), + } + + for i, param := range params { + _, err, errByName := q.QueryRange(context.Background(), param, nil) + if err != nil { + t.Errorf("expected no error, got %s", err) + } + if len(errByName) > 0 { + t.Errorf("expected no error, got %v", errByName) + } + + if !strings.Contains(q.QueriesExecuted()[i], expectedTimeRangeInQueryString[i]) { + t.Errorf("expected query to contain %s, got %s", expectedTimeRangeInQueryString[i], q.QueriesExecuted()[i]) + } + } +} + +func TestQueryRangeValueType(t *testing.T) { + // There shouldn't be any caching for value panel type + params := []*v3.QueryRangeParamsV3{ + { + Start: 1675115596722, + End: 1675115596722 + 120*60*1000, + Step: 5 * time.Minute.Milliseconds(), + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeValue, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{Key: "http_server_requests_seconds_count", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true}, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "method", IsColumn: false}, + Operator: "=", + Value: "GET", + }, + }, + }, + AggregateOperator: v3.AggregateOperatorSumRate, + Expression: "A", + ReduceTo: v3.ReduceToOperatorLast, + }, + }, + }, + }, + { + Start: 1675115596722 + 60*60*1000, + End: 1675115596722 + 180*60*1000, + Step: 5 * time.Minute.Milliseconds(), + CompositeQuery: &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeValue, + BuilderQueries: map[string]*v3.BuilderQuery{ + "A": { + QueryName: "A", + StepInterval: 60, + DataSource: v3.DataSourceTraces, + AggregateAttribute: v3.AttributeKey{Key: "durationNano", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true}, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "method", IsColumn: false}, + Operator: "=", + Value: "GET", + }, + }, + }, + AggregateOperator: v3.AggregateOperatorP95, + Expression: "A", + ReduceTo: v3.ReduceToOperatorLast, + }, + }, + }, + }, + } + cache := inmemory.New(&inmemory.Options{TTL: 60 * time.Minute, CleanupInterval: 10 * time.Minute}) + opts := QuerierOptions{ + Cache: cache, + Reader: nil, + FluxInterval: 5 * time.Minute, + KeyGenerator: queryBuilder.NewKeyGenerator(), + + TestingMode: true, + ReturnedSeries: []*v3.Series{ + { + Labels: map[string]string{ + "method": "GET", + "service_name": "test", + "__name__": "http_server_requests_seconds_count", + }, + Points: []v3.Point{ + {Timestamp: 1675115596722, Value: 1}, + {Timestamp: 1675115596722 + 60*60*1000, Value: 2}, + {Timestamp: 1675115596722 + 120*60*1000, Value: 3}, + }, + }, + }, + } + q := NewQuerier(opts) + // No caching + expectedTimeRangeInQueryString := []string{ + fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms <= %d", 1675115580000, 1675115580000+120*60*1000), + fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", (1675115580000+60*60*1000)*int64(1000000), (1675115580000+180*60*1000)*int64(1000000)), } for i, param := range params { diff --git a/pkg/query-service/app/queryBuilder/query_builder.go b/pkg/query-service/app/queryBuilder/query_builder.go index 690ba6fcc2..328b519289 100644 --- a/pkg/query-service/app/queryBuilder/query_builder.go +++ b/pkg/query-service/app/queryBuilder/query_builder.go @@ -275,12 +275,35 @@ func expressionToKey(expression *govaluate.EvaluableExpression, keys map[string] return formula.ExpressionString() } +func isMetricExpression(expression *govaluate.EvaluableExpression, params *v3.QueryRangeParamsV3) bool { + variables := unique(expression.Vars()) + for _, variable := range variables { + if params.CompositeQuery.BuilderQueries[variable].DataSource != v3.DataSourceMetrics { + return false + } + } + return true +} + func (c *cacheKeyGenerator) GenerateKeys(params *v3.QueryRangeParamsV3) map[string]string { keys := make(map[string]string) + // For non-graph panels, we don't support caching + if params.CompositeQuery.PanelType != v3.PanelTypeGraph { + return keys + } + + // Use query as the cache key for PromQL queries + if params.CompositeQuery.QueryType == v3.QueryTypePromQL { + for name, query := range params.CompositeQuery.PromQueries { + keys[name] = query.Query + } + return keys + } + // Build keys for each builder query for queryName, query := range params.CompositeQuery.BuilderQueries { - if query.Expression == queryName { + if query.Expression == queryName && query.DataSource == v3.DataSourceMetrics { var parts []string // We need to build uniqe cache query for BuilderQuery @@ -321,6 +344,10 @@ func (c *cacheKeyGenerator) GenerateKeys(params *v3.QueryRangeParamsV3) map[stri if query.Expression != query.QueryName { expression, _ := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, EvalFuncs) + if !isMetricExpression(expression, params) { + continue + } + expressionCacheKey := expressionToKey(expression, keys) keys[query.QueryName] = expressionCacheKey } diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index d2a14e8ff8..69f3a9367f 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -22,11 +22,12 @@ import ( "go.signoz.io/signoz/pkg/query-service/app/clickhouseReader" "go.signoz.io/signoz/pkg/query-service/app/dashboards" "go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline" - opamp "go.signoz.io/signoz/pkg/query-service/app/opamp" + "go.signoz.io/signoz/pkg/query-service/app/opamp" opAmpModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model" "go.signoz.io/signoz/pkg/query-service/app/explorer" "go.signoz.io/signoz/pkg/query-service/auth" + "go.signoz.io/signoz/pkg/query-service/cache" "go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/dao" "go.signoz.io/signoz/pkg/query-service/featureManager" @@ -54,6 +55,8 @@ type ServerOptions struct { MaxIdleConns int MaxOpenConns int DialTimeout time.Duration + CacheConfigPath string + FluxInterval string } // Server runs HTTP, Mux and a grpc server @@ -134,6 +137,16 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { return nil, err } + var c cache.Cache + if serverOptions.CacheConfigPath != "" { + cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath) + if err != nil { + return nil, err + } + c = cache.NewCache(cacheOpts) + } + + fluxInterval, err := time.ParseDuration(serverOptions.FluxInterval) // ingestion pipelines manager logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(localDB, "sqlite") if err != nil { @@ -153,6 +166,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { RuleManager: rm, FeatureFlags: fm, LogsParsingPipelineController: logParsingPipelineController, + Cache: c, + FluxInterval: fluxInterval, }) if err != nil { return nil, err diff --git a/pkg/query-service/config/cache-config.yml b/pkg/query-service/config/cache-config.yml new file mode 100644 index 0000000000..b1bd329584 --- /dev/null +++ b/pkg/query-service/config/cache-config.yml @@ -0,0 +1,4 @@ +provider: "inmemory" +inmemory: + ttl: 60m + cleanupInterval: 10m diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 8b3697f89c..15cc4868b3 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -100,7 +100,7 @@ type Reader interface { } type Querier interface { - QueryRange(context.Context, *v3.QueryRangeParamsV3, map[string]v3.AttributeKey) ([]*v3.Series, error, map[string]string) + QueryRange(context.Context, *v3.QueryRangeParamsV3, map[string]v3.AttributeKey) ([]*v3.Result, error, map[string]string) // test helpers QueriesExecuted() []string diff --git a/pkg/query-service/main.go b/pkg/query-service/main.go index c0fdc3d5d2..76382b10c0 100644 --- a/pkg/query-service/main.go +++ b/pkg/query-service/main.go @@ -33,7 +33,7 @@ func main() { var disableRules bool // the url used to build link in the alert messages in slack and other systems - var ruleRepoURL string + var ruleRepoURL, cacheConfigPath, fluxInterval string var preferDelta bool var preferSpanMetrics bool @@ -51,6 +51,8 @@ func main() { flag.IntVar(&maxOpenConns, "max-open-conns", 100, "(max connections for use at any time.)") flag.DurationVar(&dialTimeout, "dial-timeout", 5*time.Second, "(the maximum time to establish a connection.)") flag.StringVar(&ruleRepoURL, "rules.repo-url", constants.AlertHelpPage, "(host address used to build rule link in alert messages)") + flag.StringVar(&cacheConfigPath, "experimental.cache-config", "", "(cache config to use)") + flag.StringVar(&fluxInterval, "flux-interval", "5m", "(cache config to use)") flag.Parse() loggerMgr := initZapLog() @@ -72,6 +74,8 @@ func main() { MaxIdleConns: maxIdleConns, MaxOpenConns: maxOpenConns, DialTimeout: dialTimeout, + CacheConfigPath: cacheConfigPath, + FluxInterval: fluxInterval, } // Read the jwt secret key diff --git a/pkg/query-service/model/db.go b/pkg/query-service/model/db.go index 9dcefd7ee0..f1d7817fc7 100644 --- a/pkg/query-service/model/db.go +++ b/pkg/query-service/model/db.go @@ -53,7 +53,6 @@ func (uf UserFlag) Value() (driver.Value, error) { } func (uf *UserFlag) Scan(value interface{}) error { - fmt.Println(" value:", value) if value == "" { return nil } diff --git a/pkg/query-service/model/queryParams.go b/pkg/query-service/model/queryParams.go index 429471616f..754de3eae0 100644 --- a/pkg/query-service/model/queryParams.go +++ b/pkg/query-service/model/queryParams.go @@ -136,6 +136,7 @@ type QueryRangeParamsV2 struct { Step int64 `json:"step"` CompositeMetricQuery *CompositeMetricQuery `json:"compositeMetricQuery"` Variables map[string]interface{} `json:"variables,omitempty"` + NoCache bool `json:"noCache"` } type DashboardVars struct { diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index 46828b66c1..3ca7126636 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "math" + "sort" "strconv" "time" @@ -494,6 +495,12 @@ type Series struct { Points []MetricPoint `json:"values"` } +func (s *Series) SortPoints() { + sort.Slice(s.Points, func(i, j int) bool { + return s.Points[i].Timestamp < s.Points[j].Timestamp + }) +} + type MetricPoint struct { Timestamp int64 Value float64 @@ -505,6 +512,17 @@ func (p *MetricPoint) MarshalJSON() ([]byte, error) { return json.Marshal([...]interface{}{float64(p.Timestamp) / 1000, v}) } +// UnmarshalJSON implements json.Unmarshaler. +func (p *MetricPoint) UnmarshalJSON(b []byte) error { + var a [2]interface{} + if err := json.Unmarshal(b, &a); err != nil { + return err + } + p.Timestamp = int64(a[0].(float64) * 1000) + p.Value, _ = strconv.ParseFloat(a[1].(string), 64) + return nil +} + type ShowCreateTableStatement struct { Statement string `json:"statement" ch:"statement"` }