Merge pull request #3588 from SigNoz/release/v0.29.2

Release/v0.29.2
This commit is contained in:
Srikanth Chekuri 2023-09-19 21:54:16 +05:30 committed by GitHub
commit f0022cd13f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 1325 additions and 277 deletions

View File

@ -19,7 +19,7 @@ LOCAL_GOOS ?= $(shell go env GOOS)
LOCAL_GOARCH ?= $(shell go env GOARCH)
REPONAME ?= signoz
DOCKER_TAG ?= latest
DOCKER_TAG ?= $(subst v,,$(BUILD_VERSION))
FRONTEND_DOCKER_IMAGE ?= frontend
QUERY_SERVICE_DOCKER_IMAGE ?= query-service

View File

@ -144,7 +144,7 @@ services:
condition: on-failure
query-service:
image: signoz/query-service:0.29.1
image: signoz/query-service:0.29.2
command:
[
"-config=/root/config/prometheus.yml",
@ -184,7 +184,7 @@ services:
<<: *clickhouse-depend
frontend:
image: signoz/frontend:0.29.1
image: signoz/frontend:0.29.2
deploy:
restart_policy:
condition: on-failure

View File

@ -162,7 +162,7 @@ services:
# Notes for Maintainers/Contributors who will change Line Numbers of Frontend & Query-Section. Please Update Line Numbers in `./scripts/commentLinesForSetup.sh` & `./CONTRIBUTING.md`
query-service:
image: signoz/query-service:${DOCKER_TAG:-0.29.1}
image: signoz/query-service:${DOCKER_TAG:-0.29.2}
container_name: signoz-query-service
command:
[
@ -201,7 +201,7 @@ services:
<<: *clickhouse-depend
frontend:
image: signoz/frontend:${DOCKER_TAG:-0.29.1}
image: signoz/frontend:${DOCKER_TAG:-0.29.2}
container_name: signoz-frontend
restart: on-failure
depends_on:

View File

@ -10,6 +10,7 @@ import (
"go.signoz.io/signoz/ee/query-service/license"
baseapp "go.signoz.io/signoz/pkg/query-service/app"
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
"go.signoz.io/signoz/pkg/query-service/cache"
baseint "go.signoz.io/signoz/pkg/query-service/interfaces"
basemodel "go.signoz.io/signoz/pkg/query-service/model"
rules "go.signoz.io/signoz/pkg/query-service/rules"
@ -29,6 +30,9 @@ type APIHandlerOptions struct {
FeatureFlags baseint.FeatureLookup
LicenseManager *license.Manager
LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController
Cache cache.Cache
// Querier Influx Interval
FluxInterval time.Duration
}
type APIHandler struct {
@ -51,6 +55,8 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
RuleManager: opts.RulesManager,
FeatureFlags: opts.FeatureFlags,
LogsParsingPipelineController: opts.LogsParsingPipelineController,
Cache: opts.Cache,
FluxInterval: opts.FluxInterval,
})
if err != nil {

View File

@ -35,6 +35,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/opamp"
opAmpModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model"
baseauth "go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/cache"
baseconst "go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/healthcheck"
basealm "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
@ -62,6 +63,8 @@ type ServerOptions struct {
MaxIdleConns int
MaxOpenConns int
DialTimeout time.Duration
CacheConfigPath string
FluxInterval string
}
// Server runs HTTP api service
@ -189,6 +192,21 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
telemetry.GetInstance().SetReader(reader)
var c cache.Cache
if serverOptions.CacheConfigPath != "" {
cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath)
if err != nil {
return nil, err
}
c = cache.NewCache(cacheOpts)
}
fluxInterval, err := time.ParseDuration(serverOptions.FluxInterval)
if err != nil {
return nil, err
}
apiOpts := api.APIHandlerOptions{
DataConnector: reader,
SkipConfig: skipConfig,
@ -202,6 +220,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
FeatureFlags: lm,
LicenseManager: lm,
LogsParsingPipelineController: logParsingPipelineController,
Cache: c,
FluxInterval: fluxInterval,
}
apiHandler, err := api.NewAPIHandler(apiOpts)

View File

@ -82,6 +82,7 @@ func main() {
// the url used to build link in the alert messages in slack and other systems
var ruleRepoURL string
var cacheConfigPath, fluxInterval string
var enableQueryServiceLogOTLPExport bool
var preferDelta bool
var preferSpanMetrics bool
@ -99,10 +100,14 @@ func main() {
flag.IntVar(&maxOpenConns, "max-open-conns", 100, "(max connections for use at any time.)")
flag.DurationVar(&dialTimeout, "dial-timeout", 5*time.Second, "(the maximum time to establish a connection.)")
flag.StringVar(&ruleRepoURL, "rules.repo-url", baseconst.AlertHelpPage, "(host address used to build rule link in alert messages)")
flag.StringVar(&cacheConfigPath, "experimental.cache-config", "", "(cache config to use)")
flag.StringVar(&fluxInterval, "flux-interval", "5m", "(cache config to use)")
flag.BoolVar(&enableQueryServiceLogOTLPExport, "enable.query.service.log.otlp.export", false, "(enable query service log otlp export)")
flag.Parse()
loggerMgr := initZapLog(enableQueryServiceLogOTLPExport)
zap.ReplaceGlobals(loggerMgr)
defer loggerMgr.Sync() // flushes buffer, if any
@ -121,6 +126,8 @@ func main() {
MaxIdleConns: maxIdleConns,
MaxOpenConns: maxOpenConns,
DialTimeout: dialTimeout,
CacheConfigPath: cacheConfigPath,
FluxInterval: fluxInterval,
}
// Read the jwt secret key

View File

@ -10,7 +10,7 @@ import { NotificationProvider } from 'hooks/useNotifications';
import { ResourceProvider } from 'hooks/useResourceAttribute';
import history from 'lib/history';
import { QueryBuilderProvider } from 'providers/QueryBuilder';
import { Suspense, useState } from 'react';
import { Suspense, useEffect, useState } from 'react';
import { useDispatch, useSelector } from 'react-redux';
import { Route, Router, Switch } from 'react-router-dom';
import { Dispatch } from 'redux';
@ -18,6 +18,7 @@ import { AppState } from 'store/reducers';
import AppActions from 'types/actions';
import { UPDATE_FEATURE_FLAG_RESPONSE } from 'types/actions/app';
import AppReducer from 'types/reducer/app';
import { trackPageView } from 'utils/segmentAnalytics';
import PrivateRoute from './Private';
import defaultRoutes from './routes';
@ -32,7 +33,7 @@ function App(): JSX.Element {
const dispatch = useDispatch<Dispatch<AppActions>>();
const { hostname } = window.location;
const { hostname, pathname } = window.location;
const featureResponse = useGetFeatureFlag((allFlags) => {
const isOnboardingEnabled =
@ -73,6 +74,20 @@ function App(): JSX.Element {
}
});
useEffect(() => {
if (isLoggedInState && user && user.userId && user.email) {
window.analytics.identify(user?.userId, {
email: user?.email || '',
name: user?.name || '',
});
}
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [isLoggedInState]);
useEffect(() => {
trackPageView(pathname);
}, [pathname]);
return (
<ConfigProvider theme={themeConfig}>
<Router history={history}>

View File

@ -1,4 +1,5 @@
import { Col, Row, Space } from 'antd';
import NewExplorerCTA from 'container/NewExplorerCTA';
import ShowBreadcrumbs from '../TopNav/Breadcrumbs';
import DateTimeSelector from '../TopNav/DateTimeSelection';
@ -18,6 +19,7 @@ function LocalTopNav({
<Col span={8}>
<Row justify="end">
<Space align="start" size={30} direction="horizontal">
<NewExplorerCTA />
{actions}
{renderPermissions?.isDateTimeEnabled && (
<div>

View File

@ -4,14 +4,17 @@ import { Button, Row } from 'antd';
import Editor from 'components/Editor';
import { useMemo } from 'react';
import { useCopyToClipboard } from 'react-use';
import { ILog } from 'types/api/logs/log';
interface JSONViewProps {
logData: ILog;
}
import { JSONViewProps } from './LogDetailedView.types';
import { aggregateAttributesResourcesToString } from './utils';
function JSONView({ logData }: JSONViewProps): JSX.Element {
const [, copyToClipboard] = useCopyToClipboard();
const LogJsonData = useMemo(() => JSON.stringify(logData, null, 2), [logData]);
const LogJsonData = useMemo(
() => aggregateAttributesResourcesToString(logData),
[logData],
);
return (
<div>
<Row

View File

@ -1,4 +1,5 @@
import { MetricsType } from 'container/MetricsApplication/constant';
import { ILog } from 'types/api/logs/log';
export interface BodyTitleRendererProps {
title: string;
@ -18,3 +19,7 @@ export interface IFieldAttributes {
newField?: string;
logType?: MetricsType;
}
export interface JSONViewProps {
logData: ILog;
}

View File

@ -1,6 +1,7 @@
import { DataNode } from 'antd/es/tree';
import { MetricsType } from 'container/MetricsApplication/constant';
import { uniqueId } from 'lodash-es';
import { ILog, ILogAggregateAttributesResources } from 'types/api/logs/log';
import { DataTypes } from 'types/api/queryBuilder/queryAutocompleteResponse';
import BodyTitleRenderer from './BodyTitleRenderer';
@ -12,6 +13,15 @@ export const recursiveParseJSON = (obj: string): Record<string, unknown> => {
if (typeof value === 'string') {
return recursiveParseJSON(value);
}
if (typeof value === 'object') {
Object.entries(value).forEach(([key, val]) => {
if (typeof val === 'string') {
value[key] = val.trim();
} else if (typeof val === 'object') {
value[key] = recursiveParseJSON(JSON.stringify(val));
}
});
}
return value;
} catch (e) {
return {};
@ -174,3 +184,35 @@ export const getFieldAttributes = (field: string): IFieldAttributes => {
return { dataType, newField, logType };
};
export const aggregateAttributesResourcesToString = (logData: ILog): string => {
const outputJson: ILogAggregateAttributesResources = {
body: logData.body,
date: logData.date,
id: logData.id,
severityNumber: logData.severityNumber,
severityText: logData.severityText,
spanId: logData.spanId,
timestamp: logData.timestamp,
traceFlags: logData.traceFlags,
traceId: logData.traceId,
attributes: {},
resources: {},
};
Object.keys(logData).forEach((key) => {
if (key.startsWith('attributes_')) {
outputJson.attributes = outputJson.attributes || {};
Object.assign(outputJson.attributes, logData[key as keyof ILog]);
} else if (key.startsWith('resources_')) {
outputJson.resources = outputJson.resources || {};
Object.assign(outputJson.resources, logData[key as keyof ILog]);
} else {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
outputJson[key] = logData[key as keyof ILog];
}
});
return JSON.stringify(outputJson, null, 2);
};

View File

@ -1,3 +1,11 @@
import ROUTES from 'constants/routes';
export const RIBBON_STYLES = {
top: '-0.75rem',
};
export const buttonText = {
[ROUTES.LOGS_EXPLORER]: 'Switch to Old Logs Explorer',
[ROUTES.TRACE]: 'Try new Traces Explorer',
[ROUTES.LOGS]: 'Switch to New Logs Explorer',
};

View File

@ -2,47 +2,57 @@ import { CompassOutlined } from '@ant-design/icons';
import { Badge, Button } from 'antd';
import ROUTES from 'constants/routes';
import history from 'lib/history';
import { useMemo } from 'react';
import { useCallback, useMemo } from 'react';
import { useLocation } from 'react-router-dom';
import { RIBBON_STYLES } from './config';
import { buttonText, RIBBON_STYLES } from './config';
function NewExplorerCTA(): JSX.Element | null {
const location = useLocation();
const isTraceOrLogsExplorerPage = useMemo(
() => location.pathname === ROUTES.LOGS || location.pathname === ROUTES.TRACE,
() =>
location.pathname === ROUTES.LOGS_EXPLORER ||
location.pathname === ROUTES.TRACE ||
location.pathname === ROUTES.LOGS,
[location.pathname],
);
const onClickHandler = (): void => {
if (location.pathname === ROUTES.LOGS) {
history.push(ROUTES.LOGS_EXPLORER);
const onClickHandler = useCallback((): void => {
if (location.pathname === ROUTES.LOGS_EXPLORER) {
history.push(ROUTES.LOGS);
} else if (location.pathname === ROUTES.TRACE) {
history.push(ROUTES.TRACES_EXPLORER);
} else if (location.pathname === ROUTES.LOGS) {
history.push(ROUTES.LOGS_EXPLORER);
}
};
}, [location.pathname]);
const buttonText = useMemo(
() =>
`Try new ${ROUTES.LOGS === location.pathname ? 'Logs' : 'Traces'} Explorer`,
[location.pathname],
);
if (!isTraceOrLogsExplorerPage) {
return null;
}
return (
<Badge.Ribbon style={RIBBON_STYLES} text="New">
const button = useMemo(
() => (
<Button
icon={<CompassOutlined />}
onClick={onClickHandler}
danger
type="primary"
>
{buttonText}
{buttonText[location.pathname]}
</Button>
),
[location.pathname, onClickHandler],
);
if (!isTraceOrLogsExplorerPage) {
return null;
}
if (location.pathname === ROUTES.LOGS_EXPLORER) {
return button;
}
return (
<Badge.Ribbon style={RIBBON_STYLES} text="New">
{button}
</Badge.Ribbon>
);
}

View File

@ -151,18 +151,16 @@ export const Query = memo(function Query({
case PANEL_TYPES.TIME_SERIES: {
return (
<>
{!isMetricsDataSource && (
<Col span={11}>
<Row gutter={[11, 5]}>
<Col flex="5.93rem">
<FilterLabel label="Limit" />
</Col>
<Col flex="1 1 12.5rem">
<LimitFilter query={query} onChange={handleChangeLimit} />
</Col>
</Row>
</Col>
)}
<Col span={11}>
<Row gutter={[11, 5]}>
<Col flex="5.93rem">
<FilterLabel label="Limit" />
</Col>
<Col flex="1 1 12.5rem">
<LimitFilter query={query} onChange={handleChangeLimit} />
</Col>
</Row>
</Col>
<Col span={11}>
<Row gutter={[11, 5]}>
<Col flex="5.93rem">
@ -173,16 +171,14 @@ export const Query = memo(function Query({
</Col>
</Row>
</Col>
{!isMetricsDataSource && (
<Col span={11}>
<Row gutter={[11, 5]}>
<Col flex="5.93rem">
<FilterLabel label="Order by" />
</Col>
<Col flex="1 1 12.5rem">{renderOrderByFilter()}</Col>
</Row>
</Col>
)}
<Col span={11}>
<Row gutter={[11, 5]}>
<Col flex="5.93rem">
<FilterLabel label="Order by" />
</Col>
<Col flex="1 1 12.5rem">{renderOrderByFilter()}</Col>
</Row>
</Col>
<Col span={11}>{renderAggregateEveryFilter()}</Col>
</>
@ -251,7 +247,6 @@ export const Query = memo(function Query({
}
}, [
panelType,
isMetricsDataSource,
query,
filterConfigs?.limit?.isHidden,
filterConfigs?.having?.isHidden,

View File

@ -31,7 +31,7 @@ const menuItems: SidebarMenu[] = [
icon: <MenuOutlined />,
},
{
key: ROUTES.LOGS,
key: ROUTES.LOGS_EXPLORER,
label: 'Logs',
icon: <AlignLeftOutlined />,
},

View File

@ -0,0 +1,71 @@
import { initialAutocompleteData, OPERATORS } from 'constants/queryBuilder';
import getStep from 'lib/getStep';
import {
BaseAutocompleteData,
DataTypes,
} from 'types/api/queryBuilder/queryAutocompleteResponse';
import { Query, TagFilter } from 'types/api/queryBuilder/queryBuilderData';
import { EQueryType } from 'types/common/dashboard';
import { DataSource, LogsAggregatorOperator } from 'types/common/queryBuilder';
import { v4 as uuid } from 'uuid';
export const getTraceToLogsQuery = (
traceId: string,
minTime: number,
maxTime: number,
): Query => {
const key: BaseAutocompleteData = {
id: uuid(),
dataType: DataTypes.String,
isColumn: true,
type: '',
isJSON: false,
key: 'trace_id',
};
const filters: TagFilter = {
items: [
{
id: uuid(),
op: OPERATORS.IN,
value: traceId,
key,
},
],
op: 'AND',
};
const query: Query = {
id: uuid(),
queryType: EQueryType.QUERY_BUILDER,
clickhouse_sql: [],
promql: [],
builder: {
queryData: [
{
filters,
dataSource: DataSource.LOGS,
disabled: false,
limit: null,
aggregateAttribute: initialAutocompleteData,
aggregateOperator: LogsAggregatorOperator.NOOP,
expression: 'A',
groupBy: [],
having: [],
legend: '',
orderBy: [],
queryName: 'A',
reduceTo: 'min',
stepInterval: getStep({
start: minTime,
end: maxTime,
inputFormat: 'ns',
}),
},
],
queryFormulas: [],
},
};
return query;
};

View File

@ -1,13 +1,19 @@
import { Button, Modal, Tabs, Tooltip, Typography } from 'antd';
import Editor from 'components/Editor';
import { StyledSpace } from 'components/Styled';
import { QueryParams } from 'constants/query';
import ROUTES from 'constants/routes';
import { useIsDarkMode } from 'hooks/useDarkMode';
import createQueryParams from 'lib/createQueryParams';
import history from 'lib/history';
import { useMemo, useState } from 'react';
import { useSelector } from 'react-redux';
import { useParams } from 'react-router-dom';
import { AppState } from 'store/reducers';
import { ITraceTree } from 'types/api/trace/getTraceItem';
import { GlobalReducer } from 'types/reducer/globalTime';
import { getTraceToLogsQuery } from './config';
import Events from './Events';
import {
CardContainer,
@ -21,6 +27,10 @@ import Tags from './Tags';
function SelectedSpanDetails(props: SelectedSpanDetailsProps): JSX.Element {
const { tree, firstSpanStartTime } = props;
const { maxTime, minTime } = useSelector<AppState, GlobalReducer>(
(state) => state.globalTime,
);
const { id: traceId } = useParams<Params>();
const isDarkMode = useIsDarkMode();
@ -75,9 +85,15 @@ function SelectedSpanDetails(props: SelectedSpanDetailsProps): JSX.Element {
];
const onLogsHandler = (): void => {
const query = encodeURIComponent(`trace_id IN ('${traceId}')`);
const query = getTraceToLogsQuery(traceId, minTime, maxTime);
history.push(`${ROUTES.LOGS}?q=${query}`);
history.push(
`${ROUTES.LOGS_EXPLORER}?${createQueryParams({
[QueryParams.compositeQuery]: JSON.stringify(query),
[QueryParams.startTime]: minTime,
[QueryParams.endTime]: maxTime,
})}`,
);
};
return (

View File

@ -7,7 +7,7 @@ import {
import { useQueryBuilder } from 'hooks/queryBuilder/useQueryBuilder';
import { getOperatorsBySourceAndPanelType } from 'lib/newQueryBuilder/getOperatorsBySourceAndPanelType';
import { findDataTypeOfOperator } from 'lib/query/findDataTypeOfOperator';
import { useCallback, useEffect, useMemo, useState } from 'react';
import { useCallback, useEffect, useState } from 'react';
import { BaseAutocompleteData } from 'types/api/queryBuilder/queryAutocompleteResponse';
import { IBuilderQuery } from 'types/api/queryBuilder/queryBuilderData';
import {
@ -148,13 +148,9 @@ export const useQueryOperations: UseQueryOperations = ({
[query, index, handleSetQueryData],
);
const isMetricsDataSource = useMemo(
() => query.dataSource === DataSource.METRICS,
[query.dataSource],
);
const isTracePanelType = useMemo(() => panelType === PANEL_TYPES.TRACE, [
panelType,
]);
const isMetricsDataSource = query.dataSource === DataSource.METRICS;
const isTracePanelType = panelType === PANEL_TYPES.TRACE;
useEffect(() => {
if (initialDataSource && dataSource !== initialDataSource) return;

View File

@ -99,5 +99,72 @@
}
})();
</script>
<script>
//Set your SEGMENT_ID
const SEGMENT_ID = '<%= htmlWebpackPlugin.options.SEGMENT_ID %>';
!(function () {
var analytics = (window.analytics = window.analytics || []);
if (!analytics.initialize)
if (analytics.invoked)
window.console &&
console.error &&
console.error('Segment snippet included twice.');
else {
analytics.invoked = !0;
analytics.methods = [
'trackSubmit',
'trackClick',
'trackLink',
'trackForm',
'pageview',
'identify',
'reset',
'group',
'track',
'ready',
'alias',
'debug',
'page',
'once',
'off',
'on',
'addSourceMiddleware',
'addIntegrationMiddleware',
'setAnonymousId',
'addDestinationMiddleware',
];
analytics.factory = function (e) {
return function () {
if (window.analytics.initialized)
return window.analytics[e].apply(window.analytics, arguments);
var i = Array.prototype.slice.call(arguments);
i.unshift(e);
analytics.push(i);
return analytics;
};
};
for (var i = 0; i < analytics.methods.length; i++) {
var key = analytics.methods[i];
analytics[key] = analytics.factory(key);
}
analytics.load = function (key, i) {
var t = document.createElement('script');
t.type = 'text/javascript';
t.async = !0;
t.src =
'https://cdn.segment.com/analytics.js/v1/' + key + '/analytics.min.js';
var n = document.getElementsByTagName('script')[0];
n.parentNode.insertBefore(t, n);
analytics._loadOptions = i;
};
analytics._writeKey = SEGMENT_ID;
analytics.SNIPPET_VERSION = '4.16.1';
analytics.load(SEGMENT_ID);
analytics.page();
}
})();
</script>
</body>
</html>

View File

@ -14,3 +14,20 @@ export interface ILog {
attributesInt: Record<string, never>;
attributesFloat: Record<string, never>;
}
type OmitAttributesResources = Pick<
ILog,
Exclude<
keyof ILog,
| 'resources_string'
| 'attributesString'
| 'attributes_string'
| 'attributesInt'
| 'attributesFloat'
>
>;
export type ILogAggregateAttributesResources = OmitAttributesResources & {
attributes: Record<string, never>;
resources: Record<string, never>;
};

View File

@ -3,7 +3,7 @@ import { compose, Store } from 'redux';
declare global {
interface Window {
store: Store;
analytics: Record<string, any>;
__REDUX_DEVTOOLS_EXTENSION_COMPOSE__: typeof compose;
}
}

View File

@ -0,0 +1,12 @@
function trackPageView(pageName: string): void {
window.analytics.page(pageName);
}
function trackEvent(
eventName: string,
properties: Record<string, string>,
): void {
window.analytics.track(eventName, properties);
}
export { trackEvent, trackPageView };

View File

@ -20,6 +20,7 @@ const plugins = [
new HtmlWebpackPlugin({
template: 'src/index.html.ejs',
INTERCOM_APP_ID: process.env.INTERCOM_APP_ID,
SEGMENT_ID: process.env.SEGMENT_ID,
}),
new webpack.ProvidePlugin({
process: 'process/browser',
@ -29,6 +30,7 @@ const plugins = [
NODE_ENV: process.env.NODE_ENV,
FRONTEND_API_ENDPOINT: process.env.FRONTEND_API_ENDPOINT,
INTERCOM_APP_ID: process.env.INTERCOM_APP_ID,
SEGMENT_ID: process.env.SEGMENT_ID,
}),
}),
];

View File

@ -28,9 +28,11 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/metrics"
metricsv3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
"go.signoz.io/signoz/pkg/query-service/app/parser"
"go.signoz.io/signoz/pkg/query-service/app/querier"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
"go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/cache"
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
querytemplate "go.signoz.io/signoz/pkg/query-service/utils/queryTemplate"
@ -51,8 +53,9 @@ import (
type status string
const (
statusSuccess status = "success"
statusError status = "error"
statusSuccess status = "success"
statusError status = "error"
defaultFluxInterval = 5 * time.Minute
)
// NewRouter creates and configures a Gorilla Router.
@ -73,6 +76,7 @@ type APIHandler struct {
ruleManager *rules.Manager
featureFlags interfaces.FeatureLookup
ready func(http.HandlerFunc) http.HandlerFunc
querier interfaces.Querier
queryBuilder *queryBuilder.QueryBuilder
preferDelta bool
preferSpanMetrics bool
@ -114,6 +118,11 @@ type APIHandlerOpts struct {
// Log parsing pipelines
LogsParsingPipelineController *logparsingpipeline.LogParsingPipelineController
// cache
Cache cache.Cache
// Querier Influx Interval
FluxInterval time.Duration
}
// NewAPIHandler returns an APIHandler
@ -124,6 +133,16 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
return nil, err
}
querierOpts := querier.QuerierOptions{
Reader: opts.Reader,
Cache: opts.Cache,
KeyGenerator: queryBuilder.NewKeyGenerator(),
FluxInterval: opts.FluxInterval,
FeatureLookup: opts.FeatureFlags,
}
querier := querier.NewQuerier(querierOpts)
aH := &APIHandler{
reader: opts.Reader,
appDao: opts.AppDao,
@ -137,6 +156,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
ruleManager: opts.RuleManager,
featureFlags: opts.FeatureFlags,
LogsParsingPipelineController: opts.LogsParsingPipelineController,
querier: querier,
}
builderOpts := queryBuilder.QueryBuilderOptions{
@ -2965,9 +2985,8 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que
var result []*v3.Result
var err error
var errQuriesByName map[string]string
var queries map[string]string
switch queryRangeParams.CompositeQuery.QueryType {
case v3.QueryTypeBuilder:
var spanKeys map[string]v3.AttributeKey
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypeBuilder {
// check if any enrichment is required for logs if yes then enrich them
if logsv3.EnrichmentRequired(queryRangeParams) {
// get the fields if any logs query is present
@ -2981,42 +3000,16 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que
logsv3.Enrich(queryRangeParams, fields)
}
var spanKeys map[string]v3.AttributeKey
spanKeys, err = aH.getSpanKeysV3(ctx, queryRangeParams)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorInternal, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
return
}
queries, err = aH.queryBuilder.PrepareQueries(queryRangeParams, spanKeys)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
if queryRangeParams.CompositeQuery.PanelType == v3.PanelTypeList || queryRangeParams.CompositeQuery.PanelType == v3.PanelTypeTrace {
result, err, errQuriesByName = aH.execClickHouseListQueries(r.Context(), queries)
} else {
result, err, errQuriesByName = aH.execClickHouseGraphQueries(r.Context(), queries)
}
case v3.QueryTypeClickHouseSQL:
queries := make(map[string]string)
for name, query := range queryRangeParams.CompositeQuery.ClickHouseQueries {
if query.Disabled {
continue
}
queries[name] = query.Query
}
result, err, errQuriesByName = aH.execClickHouseGraphQueries(r.Context(), queries)
case v3.QueryTypePromQL:
result, err, errQuriesByName = aH.execPromQueries(r.Context(), queryRangeParams)
default:
err = fmt.Errorf("invalid query type")
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, errQuriesByName)
return
}
result, err, errQuriesByName = aH.querier.QueryRange(ctx, queryRangeParams, spanKeys)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
@ -3062,7 +3055,7 @@ func applyMetricLimit(results []*v3.Result, queryRangeParams *v3.QueryRangeParam
limit := builderQueries[result.QueryName].Limit
orderByList := builderQueries[result.QueryName].OrderBy
if limit != 0 {
if limit >= 0 {
if len(orderByList) == 0 {
// If no orderBy is specified, sort by value in descending order
orderByList = []v3.OrderBy{{ColumnName: constants.SigNozOrderByValue, Order: "desc"}}
@ -3070,6 +3063,18 @@ func applyMetricLimit(results []*v3.Result, queryRangeParams *v3.QueryRangeParam
sort.SliceStable(result.Series, func(i, j int) bool {
for _, orderBy := range orderByList {
if orderBy.ColumnName == constants.SigNozOrderByValue {
// For table type queries (we rely on the fact that one value for row), sort
// based on final aggregation value
if len(result.Series[i].Points) == 1 && len(result.Series[j].Points) == 1 {
if orderBy.Order == "asc" {
return result.Series[i].Points[0].Value < result.Series[j].Points[0].Value
} else if orderBy.Order == "desc" {
return result.Series[i].Points[0].Value > result.Series[j].Points[0].Value
}
}
// For graph type queries, sort based on GroupingSetsPoint
if result.Series[i].GroupingSetsPoint == nil || result.Series[j].GroupingSetsPoint == nil {
// Handle nil GroupingSetsPoint, if needed
// Here, we assume non-nil values are always less than nil values
@ -3102,7 +3107,7 @@ func applyMetricLimit(results []*v3.Result, queryRangeParams *v3.QueryRangeParam
return i < j
})
if len(result.Series) > int(limit) {
if limit > 0 && len(result.Series) > int(limit) {
result.Series = result.Series[:limit]
}
}

View File

@ -497,7 +497,7 @@ func PromFormattedValue(v interface{}) string {
case float32, float64:
return fmt.Sprintf("%f", x)
case string:
return fmt.Sprintf("%s", x)
return x
case bool:
return fmt.Sprintf("%v", x)
case []interface{}:
@ -506,7 +506,12 @@ func PromFormattedValue(v interface{}) string {
}
switch x[0].(type) {
case string, int, float32, float64, bool:
return strings.Trim(strings.Join(strings.Fields(fmt.Sprint(x)), "|"), "[]")
// list of values joined by | for promql - a value can contain whitespace
var str []string
for _, sVal := range x {
str = append(str, fmt.Sprintf("%v", sVal))
}
return strings.Join(str, "|")
default:
zap.L().Error("invalid type for prom formatted value", zap.Any("type", reflect.TypeOf(x[0])))
return ""

View File

@ -82,6 +82,7 @@ func TestBuildQueryWithMultipleQueries(t *testing.T) {
},
"B": {
QueryName: "B",
StepInterval: 60,
AggregateAttribute: v3.AttributeKey{Key: "name2"},
AggregateOperator: v3.AggregateOperatorRateMax,
Expression: "B",

View File

@ -844,7 +844,6 @@ func parseFilterAttributeKeyRequest(r *http.Request) (*v3.FilterAttributeKeyRequ
dataSource := v3.DataSource(r.URL.Query().Get("dataSource"))
aggregateOperator := v3.AggregateOperator(r.URL.Query().Get("aggregateOperator"))
aggregateAttribute := r.URL.Query().Get("aggregateAttribute")
limit, err := strconv.Atoi(r.URL.Query().Get("limit"))
if err != nil {
limit = 50
@ -1045,5 +1044,29 @@ func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiE
}
}
// replace go template variables in prometheus query
if queryRangeParams.CompositeQuery.QueryType == v3.QueryTypePromQL {
for _, promQuery := range queryRangeParams.CompositeQuery.PromQueries {
if promQuery.Disabled {
continue
}
tmpl := template.New("prometheus-query")
tmpl, err := tmpl.Parse(promQuery.Query)
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
}
var query bytes.Buffer
// replace go template variables
querytemplate.AssignReservedVarsV3(queryRangeParams)
err = tmpl.Execute(&query, queryRangeParams.Variables)
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
}
promQuery.Query = query.String()
}
}
return queryRangeParams, nil
}

View File

@ -757,3 +757,108 @@ func TestParseQueryRangeParamsDashboardVarsSubstitution(t *testing.T) {
})
}
}
func TestParseQueryRangeParamsPromQLVars(t *testing.T) {
reqCases := []struct {
desc string
compositeQuery v3.CompositeQuery
variables map[string]interface{}
expectErr bool
errMsg string
expectedQuery string
}{
{
desc: "valid prom query with dashboard variables",
compositeQuery: v3.CompositeQuery{
PanelType: v3.PanelTypeGraph,
QueryType: v3.QueryTypePromQL,
PromQueries: map[string]*v3.PromQuery{
"A": {
Query: "http_calls_total{service_name=\"{{.service_name}}\", operation_name=~\"{{.operation_name}}\"}",
Disabled: false,
},
},
},
variables: map[string]interface{}{
"service_name": "route",
"operation_name": []interface{}{
"GET /route",
"POST /route",
},
},
expectErr: false,
expectedQuery: "http_calls_total{service_name=\"route\", operation_name=~\"GET /route|POST /route\"}",
},
{
desc: "valid prom query with dashboard variables",
compositeQuery: v3.CompositeQuery{
PanelType: v3.PanelTypeGraph,
QueryType: v3.QueryTypePromQL,
PromQueries: map[string]*v3.PromQuery{
"A": {
Query: "http_calls_total{service_name=\"{{.service_name}}\", status_code=~\"{{.status_code}}\"}",
Disabled: false,
},
},
},
variables: map[string]interface{}{
"service_name": "route",
"status_code": []interface{}{
200,
505,
},
},
expectErr: false,
expectedQuery: "http_calls_total{service_name=\"route\", status_code=~\"200|505\"}",
},
{
desc: "valid prom query with dashboard variables",
compositeQuery: v3.CompositeQuery{
PanelType: v3.PanelTypeGraph,
QueryType: v3.QueryTypePromQL,
PromQueries: map[string]*v3.PromQuery{
"A": {
Query: "http_calls_total{service_name=\"{{.service_name}}\", quantity=~\"{{.quantity}}\"}",
Disabled: false,
},
},
},
variables: map[string]interface{}{
"service_name": "route",
"quantity": []interface{}{
4.5,
4.6,
},
},
expectErr: false,
expectedQuery: "http_calls_total{service_name=\"route\", quantity=~\"4.5|4.6\"}",
},
}
for _, tc := range reqCases {
t.Run(tc.desc, func(t *testing.T) {
queryRangeParams := &v3.QueryRangeParamsV3{
Start: time.Now().Add(-time.Hour).UnixMilli(),
End: time.Now().UnixMilli(),
Step: time.Minute.Microseconds(),
CompositeQuery: &tc.compositeQuery,
Variables: tc.variables,
}
body := &bytes.Buffer{}
err := json.NewEncoder(body).Encode(queryRangeParams)
require.NoError(t, err)
req := httptest.NewRequest(http.MethodPost, "/api/v3/query_range", body)
parsedQueryRangeParams, apiErr := ParseQueryRangeParams(req)
if tc.expectErr {
require.Error(t, apiErr)
require.Contains(t, apiErr.Error(), tc.errMsg)
} else {
require.Nil(t, apiErr)
require.Equal(t, parsedQueryRangeParams.CompositeQuery.PromQueries["A"].Query, tc.expectedQuery)
}
})
}
}

View File

@ -0,0 +1,304 @@
package querier
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3"
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
"go.signoz.io/signoz/pkg/query-service/cache/status"
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.uber.org/zap"
)
func (q *querier) runBuilderQuery(
ctx context.Context,
builderQuery *v3.BuilderQuery,
params *v3.QueryRangeParamsV3,
keys map[string]v3.AttributeKey,
cacheKeys map[string]string,
ch chan channelResult,
wg *sync.WaitGroup,
) {
defer wg.Done()
queryName := builderQuery.QueryName
var preferRPM bool
if q.featureLookUp != nil {
preferRPM = q.featureLookUp.CheckFeature(constants.PreferRPM) == nil
}
// TODO: handle other data sources
if builderQuery.DataSource == v3.DataSourceLogs {
var query string
var err error
// for ts query with limit replace it as it is already formed
if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 {
limitQuery, err := logsV3.PrepareLogsQuery(
params.Start,
params.End,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
logsV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: preferRPM},
)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil}
return
}
placeholderQuery, err := logsV3.PrepareLogsQuery(
params.Start,
params.End,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
logsV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: preferRPM},
)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: placeholderQuery, Series: nil}
return
}
query = fmt.Sprintf(placeholderQuery, limitQuery)
} else {
query, err = logsV3.PrepareLogsQuery(
params.Start,
params.End,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
logsV3.Options{PreferRPM: preferRPM},
)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return
}
}
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return
}
series, err := q.execClickHouseQuery(ctx, query)
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series}
return
}
if builderQuery.DataSource == v3.DataSourceTraces {
var query string
var err error
// for ts query with group by and limit form two queries
if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 {
limitQuery, err := tracesV3.PrepareTracesQuery(
params.Start,
params.End,
params.CompositeQuery.PanelType,
builderQuery,
keys,
tracesV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit, PreferRPM: preferRPM},
)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil}
return
}
placeholderQuery, err := tracesV3.PrepareTracesQuery(
params.Start,
params.End,
params.CompositeQuery.PanelType,
builderQuery,
keys,
tracesV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit, PreferRPM: preferRPM},
)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil}
return
}
query = fmt.Sprintf(placeholderQuery, limitQuery)
} else {
query, err = tracesV3.PrepareTracesQuery(
params.Start,
params.End,
params.CompositeQuery.PanelType,
builderQuery,
keys,
tracesV3.Options{PreferRPM: preferRPM},
)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return
}
}
series, err := q.execClickHouseQuery(ctx, query)
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series}
return
}
// What is happening here?
// We are only caching the graph panel queries. A non-existant cache key means that the query is not cached.
// If the query is not cached, we execute the query and return the result without caching it.
if _, ok := cacheKeys[queryName]; !ok {
query, err := metricsV3.PrepareMetricQuery(params.Start, params.End, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, metricsV3.Options{PreferRPM: preferRPM})
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return
}
series, err := q.execClickHouseQuery(ctx, query)
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series}
return
}
cacheKey := cacheKeys[queryName]
var cachedData []byte
if !params.NoCache && q.cache != nil {
var retrieveStatus status.RetrieveStatus
data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true)
zap.S().Debug("cache retrieve status", zap.String("status", retrieveStatus.String()))
if err == nil {
cachedData = data
}
}
misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {
query, err := metricsV3.PrepareMetricQuery(
miss.start,
miss.end,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
metricsV3.Options{},
)
if err != nil {
ch <- channelResult{
Err: err,
Name: queryName,
Query: query,
Series: nil,
}
return
}
series, err := q.execClickHouseQuery(ctx, query)
if err != nil {
ch <- channelResult{
Err: err,
Name: queryName,
Query: query,
Series: nil,
}
return
}
missedSeries = append(missedSeries, series...)
}
if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil {
zap.S().Error("error unmarshalling cached data", zap.Error(err))
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
ch <- channelResult{
Err: nil,
Name: queryName,
Series: mergedSeries,
}
// Cache the seriesList for future queries
if len(missedSeries) > 0 && !params.NoCache && q.cache != nil {
mergedSeriesData, err := json.Marshal(mergedSeries)
if err != nil {
zap.S().Error("error marshalling merged series", zap.Error(err))
return
}
err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
if err != nil {
zap.S().Error("error storing merged series", zap.Error(err))
return
}
}
}
func (q *querier) runBuilderExpression(
ctx context.Context,
builderQuery *v3.BuilderQuery,
params *v3.QueryRangeParamsV3,
keys map[string]v3.AttributeKey,
cacheKeys map[string]string,
ch chan channelResult,
wg *sync.WaitGroup,
) {
defer wg.Done()
queryName := builderQuery.QueryName
queries, err := q.builder.PrepareQueries(params, keys)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: "", Series: nil}
return
}
if _, ok := cacheKeys[queryName]; !ok {
query := queries[queryName]
series, err := q.execClickHouseQuery(ctx, query)
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series}
return
}
cacheKey := cacheKeys[queryName]
var cachedData []byte
if !params.NoCache && q.cache != nil {
var retrieveStatus status.RetrieveStatus
data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true)
zap.S().Debug("cache retrieve status", zap.String("status", retrieveStatus.String()))
if err == nil {
cachedData = data
}
}
misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {
missQueries, _ := q.builder.PrepareQueries(&v3.QueryRangeParamsV3{
Start: miss.start,
End: miss.end,
Step: params.Step,
NoCache: params.NoCache,
CompositeQuery: params.CompositeQuery,
Variables: params.Variables,
}, keys)
query := missQueries[queryName]
series, err := q.execClickHouseQuery(ctx, query)
if err != nil {
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
return
}
missedSeries = append(missedSeries, series...)
}
if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil {
zap.S().Error("error unmarshalling cached data", zap.Error(err))
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
ch <- channelResult{
Err: nil,
Name: queryName,
Series: mergedSeries,
}
// Cache the seriesList for future queries
if len(missedSeries) > 0 && !params.NoCache && q.cache != nil {
mergedSeriesData, err := json.Marshal(mergedSeries)
if err != nil {
zap.S().Error("error marshalling merged series", zap.Error(err))
return
}
err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
if err != nil {
zap.S().Error("error storing merged series", zap.Error(err))
return
}
}
}

View File

@ -7,20 +7,30 @@ import (
"math"
"sort"
"strings"
"sync"
"time"
logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3"
metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
"go.signoz.io/signoz/pkg/query-service/cache"
"go.signoz.io/signoz/pkg/query-service/cache/status"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.uber.org/multierr"
"go.uber.org/zap"
)
type channelResult struct {
Series []*v3.Series
List []*v3.Row
Err error
Name string
Query string
}
type missInterval struct {
start, end int64 // in milliseconds
}
@ -32,6 +42,9 @@ type querier struct {
fluxInterval time.Duration
builder *queryBuilder.QueryBuilder
featureLookUp interfaces.FeatureLookup
// used for testing
// TODO(srikanthccv): remove this once we have a proper mock
testingMode bool
@ -41,10 +54,11 @@ type querier struct {
}
type QuerierOptions struct {
Reader interfaces.Reader
Cache cache.Cache
KeyGenerator cache.KeyGenerator
FluxInterval time.Duration
Reader interfaces.Reader
Cache cache.Cache
KeyGenerator cache.KeyGenerator
FluxInterval time.Duration
FeatureLookup interfaces.FeatureLookup
// used for testing
TestingMode bool
@ -59,6 +73,13 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier {
keyGenerator: opts.KeyGenerator,
fluxInterval: opts.FluxInterval,
builder: queryBuilder.NewQueryBuilder(queryBuilder.QueryBuilderOptions{
BuildTraceQuery: tracesV3.PrepareTracesQuery,
BuildLogQuery: logsV3.PrepareLogsQuery,
BuildMetricQuery: metricsV3.PrepareMetricQuery,
}, opts.FeatureLookup),
featureLookUp: opts.FeatureLookup,
testingMode: opts.TestingMode,
returnedSeries: opts.ReturnedSeries,
returnedErr: opts.ReturnedErr,
@ -160,7 +181,7 @@ func findMissingTimeRanges(start, end int64, seriesList []*v3.Series, fluxInterv
var validMisses []missInterval
for idx := range misses {
miss := misses[idx]
if miss.start <= miss.end {
if miss.start < miss.end {
validMisses = append(validMisses, miss)
}
}
@ -223,206 +244,246 @@ func mergeSerieses(cachedSeries, missedSeries []*v3.Series) []*v3.Series {
return mergedSeries
}
func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Series, error, map[string]string) {
func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, error, map[string]string) {
cacheKeys := q.keyGenerator.GenerateKeys(params)
seriesList := make([]*v3.Series, 0)
errQueriesByName := make(map[string]string)
var err error
ch := make(chan channelResult, len(params.CompositeQuery.BuilderQueries))
var wg sync.WaitGroup
for queryName, builderQuery := range params.CompositeQuery.BuilderQueries {
// TODO: add support for logs and traces
if builderQuery.DataSource == v3.DataSourceLogs {
query, err := logsV3.PrepareLogsQuery(params.Start, params.End, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, logsV3.Options{})
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
series, err := q.execClickHouseQuery(ctx, query)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
seriesList = append(seriesList, series...)
if builderQuery.Disabled {
continue
}
if builderQuery.DataSource == v3.DataSourceTraces {
query, err := tracesV3.PrepareTracesQuery(params.Start, params.End, params.CompositeQuery.PanelType, builderQuery, keys, tracesV3.Options{
GraphLimitQtype: "",
PreferRPM: false,
})
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
series, err := q.execClickHouseQuery(ctx, query)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
seriesList = append(seriesList, series...)
continue
}
cacheKey := cacheKeys[queryName]
var cachedData []byte
if !params.NoCache {
var retrieveStatus status.RetrieveStatus
cachedData, retrieveStatus, err = q.cache.Retrieve(cacheKey, true)
zap.L().Debug("cache retrieve status", zap.String("status", retrieveStatus.String()))
if err != nil {
return nil, err, nil
}
}
misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {
query, err := metricsV3.PrepareMetricQuery(
miss.start,
miss.end,
params.CompositeQuery.QueryType,
params.CompositeQuery.PanelType,
builderQuery,
metricsV3.Options{PreferRPM: false},
)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
series, err := q.execClickHouseQuery(ctx, query)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
missedSeries = append(missedSeries, series...)
}
if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil {
errQueriesByName[queryName] = err.Error()
continue
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
seriesList = append(seriesList, mergedSeries...)
// Cache the seriesList for future queries
if len(missedSeries) > 0 {
mergedSeriesData, err := json.Marshal(mergedSeries)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
wg.Add(1)
if queryName == builderQuery.Expression {
go q.runBuilderQuery(ctx, builderQuery, params, keys, cacheKeys, ch, &wg)
} else {
go q.runBuilderExpression(ctx, builderQuery, params, keys, cacheKeys, ch, &wg)
}
}
if len(errQueriesByName) > 0 {
wg.Wait()
close(ch)
results := make([]*v3.Result, 0)
errQueriesByName := make(map[string]string)
var errs []error
for result := range ch {
if result.Err != nil {
errs = append(errs, result.Err)
errQueriesByName[result.Name] = result.Err.Error()
continue
}
results = append(results, &v3.Result{
QueryName: result.Name,
Series: result.Series,
})
}
var err error
if len(errs) > 0 {
err = fmt.Errorf("error in builder queries")
}
return seriesList, err, errQueriesByName
return results, err, errQueriesByName
}
func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Series, error, map[string]string) {
seriesList := make([]*v3.Series, 0)
errQueriesByName := make(map[string]string)
var err error
for queryName, promQuery := range params.CompositeQuery.PromQueries {
cacheKey := q.keyGenerator.GenerateKeys(params)[queryName]
var cachedData []byte
var retrieveStatus status.RetrieveStatus
if !params.NoCache {
cachedData, retrieveStatus, err = q.cache.Retrieve(cacheKey, true)
zap.L().Debug("cache retrieve status", zap.String("status", retrieveStatus.String()))
}
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {
query := metricsV3.BuildPromQuery(
promQuery,
params.Step,
miss.start,
miss.end,
)
series, err := q.execPromQuery(ctx, query)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
missedSeries = append(missedSeries, series...)
}
if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil {
errQueriesByName[queryName] = err.Error()
continue
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, error, map[string]string) {
channelResults := make(chan channelResult, len(params.CompositeQuery.PromQueries))
var wg sync.WaitGroup
cacheKeys := q.keyGenerator.GenerateKeys(params)
seriesList = append(seriesList, mergedSeries...)
// Cache the seriesList for future queries
if len(missedSeries) > 0 {
mergedSeriesData, err := json.Marshal(mergedSeries)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
if err != nil {
errQueriesByName[queryName] = err.Error()
continue
}
for queryName, promQuery := range params.CompositeQuery.PromQueries {
if promQuery.Disabled {
continue
}
wg.Add(1)
go func(queryName string, promQuery *v3.PromQuery) {
defer wg.Done()
cacheKey := cacheKeys[queryName]
var cachedData []byte
// Ensure NoCache is not set and cache is not nil
if !params.NoCache && q.cache != nil {
data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true)
zap.S().Debug("cache retrieve status", zap.String("status", retrieveStatus.String()))
if err == nil {
cachedData = data
}
}
misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData)
missedSeries := make([]*v3.Series, 0)
cachedSeries := make([]*v3.Series, 0)
for _, miss := range misses {
query := metricsV3.BuildPromQuery(promQuery, params.Step, miss.start, miss.end)
series, err := q.execPromQuery(ctx, query)
if err != nil {
channelResults <- channelResult{Err: err, Name: queryName, Query: query.Query, Series: nil}
return
}
missedSeries = append(missedSeries, series...)
}
if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil {
// ideally we should not be getting an error here
zap.S().Error("error unmarshalling cached data", zap.Error(err))
}
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
channelResults <- channelResult{Err: nil, Name: queryName, Query: promQuery.Query, Series: mergedSeries}
// Cache the seriesList for future queries
if len(missedSeries) > 0 && !params.NoCache && q.cache != nil {
mergedSeriesData, err := json.Marshal(mergedSeries)
if err != nil {
zap.S().Error("error marshalling merged series", zap.Error(err))
return
}
err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
if err != nil {
zap.S().Error("error storing merged series", zap.Error(err))
return
}
}
}(queryName, promQuery)
}
if len(errQueriesByName) > 0 {
wg.Wait()
close(channelResults)
results := make([]*v3.Result, 0)
errQueriesByName := make(map[string]string)
var errs []error
for result := range channelResults {
if result.Err != nil {
errs = append(errs, result.Err)
errQueriesByName[result.Name] = result.Err.Error()
continue
}
results = append(results, &v3.Result{
QueryName: result.Name,
Series: result.Series,
})
}
var err error
if len(errs) > 0 {
err = fmt.Errorf("error in prom queries")
}
return seriesList, err, errQueriesByName
return results, err, errQueriesByName
}
func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Series, error, map[string]string) {
seriesList := make([]*v3.Series, 0)
errQueriesByName := make(map[string]string)
var err error
func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, error, map[string]string) {
channelResults := make(chan channelResult, len(params.CompositeQuery.ClickHouseQueries))
var wg sync.WaitGroup
for queryName, clickHouseQuery := range params.CompositeQuery.ClickHouseQueries {
series, err := q.execClickHouseQuery(ctx, clickHouseQuery.Query)
if err != nil {
errQueriesByName[queryName] = err.Error()
if clickHouseQuery.Disabled {
continue
}
seriesList = append(seriesList, series...)
wg.Add(1)
go func(queryName string, clickHouseQuery *v3.ClickHouseQuery) {
defer wg.Done()
series, err := q.execClickHouseQuery(ctx, clickHouseQuery.Query)
channelResults <- channelResult{Err: err, Name: queryName, Query: clickHouseQuery.Query, Series: series}
}(queryName, clickHouseQuery)
}
if len(errQueriesByName) > 0 {
wg.Wait()
close(channelResults)
results := make([]*v3.Result, 0)
errQueriesByName := make(map[string]string)
var errs []error
for result := range channelResults {
if result.Err != nil {
errs = append(errs, result.Err)
errQueriesByName[result.Name] = result.Err.Error()
continue
}
results = append(results, &v3.Result{
QueryName: result.Name,
Series: result.Series,
})
}
var err error
if len(errs) > 0 {
err = fmt.Errorf("error in clickhouse queries")
}
return seriesList, err, errQueriesByName
return results, err, errQueriesByName
}
func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Series, error, map[string]string) {
var seriesList []*v3.Series
func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, error, map[string]string) {
queries, err := q.builder.PrepareQueries(params, keys)
if err != nil {
return nil, err, nil
}
ch := make(chan channelResult, len(queries))
var wg sync.WaitGroup
for name, query := range queries {
wg.Add(1)
go func(name, query string) {
defer wg.Done()
rowList, err := q.reader.GetListResultV3(ctx, query)
if err != nil {
ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err), Name: name, Query: query}
return
}
ch <- channelResult{List: rowList, Name: name, Query: query}
}(name, query)
}
wg.Wait()
close(ch)
var errs []error
errQuriesByName := make(map[string]string)
res := make([]*v3.Result, 0)
// read values from the channel
for r := range ch {
if r.Err != nil {
errs = append(errs, r.Err)
errQuriesByName[r.Name] = r.Query
continue
}
res = append(res, &v3.Result{
QueryName: r.Name,
List: r.List,
})
}
if len(errs) != 0 {
return nil, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)), errQuriesByName
}
return res, nil, nil
}
func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, error, map[string]string) {
var results []*v3.Result
var err error
var errQueriesByName map[string]string
if params.CompositeQuery != nil {
switch params.CompositeQuery.QueryType {
case v3.QueryTypeBuilder:
seriesList, err, errQueriesByName = q.runBuilderQueries(ctx, params, keys)
if params.CompositeQuery.PanelType == v3.PanelTypeList || params.CompositeQuery.PanelType == v3.PanelTypeTrace {
results, err, errQueriesByName = q.runBuilderListQueries(ctx, params, keys)
} else {
results, err, errQueriesByName = q.runBuilderQueries(ctx, params, keys)
}
case v3.QueryTypePromQL:
seriesList, err, errQueriesByName = q.runPromQueries(ctx, params)
results, err, errQueriesByName = q.runPromQueries(ctx, params)
case v3.QueryTypeClickHouseSQL:
seriesList, err, errQueriesByName = q.runClickHouseQueries(ctx, params)
results, err, errQueriesByName = q.runClickHouseQueries(ctx, params)
default:
err = fmt.Errorf("invalid query type")
}
}
return seriesList, err, errQueriesByName
return results, err, errQueriesByName
}
func (q *querier) QueriesExecuted() []string {

View File

@ -406,9 +406,11 @@ func TestQueryRange(t *testing.T) {
End: 1675115596722 + 120*60*1000,
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
DataSource: v3.DataSourceMetrics,
StepInterval: 60,
AggregateAttribute: v3.AttributeKey{Key: "http_server_requests_seconds_count", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true},
Filters: &v3.FilterSet{
@ -436,11 +438,13 @@ func TestQueryRange(t *testing.T) {
End: 1675115596722 + 180*60*1000,
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
StepInterval: 60,
AggregateAttribute: v3.AttributeKey{Key: "http_server_requests_seconds_count", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true},
DataSource: v3.DataSourceMetrics,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
@ -461,6 +465,73 @@ func TestQueryRange(t *testing.T) {
},
},
},
// No caching for traces & logs yet
{
Start: 1675115596722,
End: 1675115596722 + 120*60*1000,
Step: 5 * time.Minute.Milliseconds(),
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
AggregateAttribute: v3.AttributeKey{Key: "durationNano", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true},
StepInterval: 60,
DataSource: v3.DataSourceTraces,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{Key: "method", IsColumn: false},
Operator: "=",
Value: "GET",
},
},
},
GroupBy: []v3.AttributeKey{
{Key: "serviceName", IsColumn: false},
{Key: "name", IsColumn: false},
},
AggregateOperator: v3.AggregateOperatorP95,
Expression: "A",
},
},
},
},
{
Start: 1675115596722 + 60*60*1000,
End: 1675115596722 + 180*60*1000,
Step: 5 * time.Minute.Milliseconds(),
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeGraph,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
AggregateAttribute: v3.AttributeKey{Key: "durationNano", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true},
StepInterval: 60,
DataSource: v3.DataSourceTraces,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{Key: "method", IsColumn: false},
Operator: "=",
Value: "GET",
},
},
},
GroupBy: []v3.AttributeKey{
{Key: "serviceName", IsColumn: false},
{Key: "name", IsColumn: false},
},
AggregateOperator: v3.AggregateOperatorP95,
Expression: "A",
},
},
},
},
}
cache := inmemory.New(&inmemory.Options{TTL: 5 * time.Minute, CleanupInterval: 10 * time.Minute})
opts := QuerierOptions{
@ -489,6 +560,117 @@ func TestQueryRange(t *testing.T) {
expectedTimeRangeInQueryString := []string{
fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms <= %d", 1675115580000, 1675115580000+120*60*1000),
fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms <= %d", 1675115580000+120*60*1000, 1675115580000+180*60*1000),
fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", 1675115580000*1000000, (1675115580000+120*60*1000)*int64(1000000)),
fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", (1675115580000+60*60*1000)*int64(1000000), (1675115580000+180*60*1000)*int64(1000000)),
}
for i, param := range params {
_, err, errByName := q.QueryRange(context.Background(), param, nil)
if err != nil {
t.Errorf("expected no error, got %s", err)
}
if len(errByName) > 0 {
t.Errorf("expected no error, got %v", errByName)
}
if !strings.Contains(q.QueriesExecuted()[i], expectedTimeRangeInQueryString[i]) {
t.Errorf("expected query to contain %s, got %s", expectedTimeRangeInQueryString[i], q.QueriesExecuted()[i])
}
}
}
func TestQueryRangeValueType(t *testing.T) {
// There shouldn't be any caching for value panel type
params := []*v3.QueryRangeParamsV3{
{
Start: 1675115596722,
End: 1675115596722 + 120*60*1000,
Step: 5 * time.Minute.Milliseconds(),
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeValue,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{Key: "http_server_requests_seconds_count", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true},
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{Key: "method", IsColumn: false},
Operator: "=",
Value: "GET",
},
},
},
AggregateOperator: v3.AggregateOperatorSumRate,
Expression: "A",
ReduceTo: v3.ReduceToOperatorLast,
},
},
},
},
{
Start: 1675115596722 + 60*60*1000,
End: 1675115596722 + 180*60*1000,
Step: 5 * time.Minute.Milliseconds(),
CompositeQuery: &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeValue,
BuilderQueries: map[string]*v3.BuilderQuery{
"A": {
QueryName: "A",
StepInterval: 60,
DataSource: v3.DataSourceTraces,
AggregateAttribute: v3.AttributeKey{Key: "durationNano", Type: v3.AttributeKeyTypeUnspecified, DataType: "float64", IsColumn: true},
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{Key: "method", IsColumn: false},
Operator: "=",
Value: "GET",
},
},
},
AggregateOperator: v3.AggregateOperatorP95,
Expression: "A",
ReduceTo: v3.ReduceToOperatorLast,
},
},
},
},
}
cache := inmemory.New(&inmemory.Options{TTL: 60 * time.Minute, CleanupInterval: 10 * time.Minute})
opts := QuerierOptions{
Cache: cache,
Reader: nil,
FluxInterval: 5 * time.Minute,
KeyGenerator: queryBuilder.NewKeyGenerator(),
TestingMode: true,
ReturnedSeries: []*v3.Series{
{
Labels: map[string]string{
"method": "GET",
"service_name": "test",
"__name__": "http_server_requests_seconds_count",
},
Points: []v3.Point{
{Timestamp: 1675115596722, Value: 1},
{Timestamp: 1675115596722 + 60*60*1000, Value: 2},
{Timestamp: 1675115596722 + 120*60*1000, Value: 3},
},
},
},
}
q := NewQuerier(opts)
// No caching
expectedTimeRangeInQueryString := []string{
fmt.Sprintf("timestamp_ms >= %d AND timestamp_ms <= %d", 1675115580000, 1675115580000+120*60*1000),
fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", (1675115580000+60*60*1000)*int64(1000000), (1675115580000+180*60*1000)*int64(1000000)),
}
for i, param := range params {

View File

@ -275,12 +275,35 @@ func expressionToKey(expression *govaluate.EvaluableExpression, keys map[string]
return formula.ExpressionString()
}
func isMetricExpression(expression *govaluate.EvaluableExpression, params *v3.QueryRangeParamsV3) bool {
variables := unique(expression.Vars())
for _, variable := range variables {
if params.CompositeQuery.BuilderQueries[variable].DataSource != v3.DataSourceMetrics {
return false
}
}
return true
}
func (c *cacheKeyGenerator) GenerateKeys(params *v3.QueryRangeParamsV3) map[string]string {
keys := make(map[string]string)
// For non-graph panels, we don't support caching
if params.CompositeQuery.PanelType != v3.PanelTypeGraph {
return keys
}
// Use query as the cache key for PromQL queries
if params.CompositeQuery.QueryType == v3.QueryTypePromQL {
for name, query := range params.CompositeQuery.PromQueries {
keys[name] = query.Query
}
return keys
}
// Build keys for each builder query
for queryName, query := range params.CompositeQuery.BuilderQueries {
if query.Expression == queryName {
if query.Expression == queryName && query.DataSource == v3.DataSourceMetrics {
var parts []string
// We need to build uniqe cache query for BuilderQuery
@ -321,6 +344,10 @@ func (c *cacheKeyGenerator) GenerateKeys(params *v3.QueryRangeParamsV3) map[stri
if query.Expression != query.QueryName {
expression, _ := govaluate.NewEvaluableExpressionWithFunctions(query.Expression, EvalFuncs)
if !isMetricExpression(expression, params) {
continue
}
expressionCacheKey := expressionToKey(expression, keys)
keys[query.QueryName] = expressionCacheKey
}

View File

@ -22,11 +22,12 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
opamp "go.signoz.io/signoz/pkg/query-service/app/opamp"
"go.signoz.io/signoz/pkg/query-service/app/opamp"
opAmpModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model"
"go.signoz.io/signoz/pkg/query-service/app/explorer"
"go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/cache"
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/dao"
"go.signoz.io/signoz/pkg/query-service/featureManager"
@ -54,6 +55,8 @@ type ServerOptions struct {
MaxIdleConns int
MaxOpenConns int
DialTimeout time.Duration
CacheConfigPath string
FluxInterval string
}
// Server runs HTTP, Mux and a grpc server
@ -134,6 +137,16 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
return nil, err
}
var c cache.Cache
if serverOptions.CacheConfigPath != "" {
cacheOpts, err := cache.LoadFromYAMLCacheConfigFile(serverOptions.CacheConfigPath)
if err != nil {
return nil, err
}
c = cache.NewCache(cacheOpts)
}
fluxInterval, err := time.ParseDuration(serverOptions.FluxInterval)
// ingestion pipelines manager
logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(localDB, "sqlite")
if err != nil {
@ -153,6 +166,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
RuleManager: rm,
FeatureFlags: fm,
LogsParsingPipelineController: logParsingPipelineController,
Cache: c,
FluxInterval: fluxInterval,
})
if err != nil {
return nil, err

View File

@ -0,0 +1,4 @@
provider: "inmemory"
inmemory:
ttl: 60m
cleanupInterval: 10m

View File

@ -100,7 +100,7 @@ type Reader interface {
}
type Querier interface {
QueryRange(context.Context, *v3.QueryRangeParamsV3, map[string]v3.AttributeKey) ([]*v3.Series, error, map[string]string)
QueryRange(context.Context, *v3.QueryRangeParamsV3, map[string]v3.AttributeKey) ([]*v3.Result, error, map[string]string)
// test helpers
QueriesExecuted() []string

View File

@ -33,7 +33,7 @@ func main() {
var disableRules bool
// the url used to build link in the alert messages in slack and other systems
var ruleRepoURL string
var ruleRepoURL, cacheConfigPath, fluxInterval string
var preferDelta bool
var preferSpanMetrics bool
@ -51,6 +51,8 @@ func main() {
flag.IntVar(&maxOpenConns, "max-open-conns", 100, "(max connections for use at any time.)")
flag.DurationVar(&dialTimeout, "dial-timeout", 5*time.Second, "(the maximum time to establish a connection.)")
flag.StringVar(&ruleRepoURL, "rules.repo-url", constants.AlertHelpPage, "(host address used to build rule link in alert messages)")
flag.StringVar(&cacheConfigPath, "experimental.cache-config", "", "(cache config to use)")
flag.StringVar(&fluxInterval, "flux-interval", "5m", "(cache config to use)")
flag.Parse()
loggerMgr := initZapLog()
@ -72,6 +74,8 @@ func main() {
MaxIdleConns: maxIdleConns,
MaxOpenConns: maxOpenConns,
DialTimeout: dialTimeout,
CacheConfigPath: cacheConfigPath,
FluxInterval: fluxInterval,
}
// Read the jwt secret key

View File

@ -53,7 +53,6 @@ func (uf UserFlag) Value() (driver.Value, error) {
}
func (uf *UserFlag) Scan(value interface{}) error {
fmt.Println(" value:", value)
if value == "" {
return nil
}

View File

@ -136,6 +136,7 @@ type QueryRangeParamsV2 struct {
Step int64 `json:"step"`
CompositeMetricQuery *CompositeMetricQuery `json:"compositeMetricQuery"`
Variables map[string]interface{} `json:"variables,omitempty"`
NoCache bool `json:"noCache"`
}
type DashboardVars struct {

View File

@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"math"
"sort"
"strconv"
"time"
@ -494,6 +495,12 @@ type Series struct {
Points []MetricPoint `json:"values"`
}
func (s *Series) SortPoints() {
sort.Slice(s.Points, func(i, j int) bool {
return s.Points[i].Timestamp < s.Points[j].Timestamp
})
}
type MetricPoint struct {
Timestamp int64
Value float64
@ -505,6 +512,17 @@ func (p *MetricPoint) MarshalJSON() ([]byte, error) {
return json.Marshal([...]interface{}{float64(p.Timestamp) / 1000, v})
}
// UnmarshalJSON implements json.Unmarshaler.
func (p *MetricPoint) UnmarshalJSON(b []byte) error {
var a [2]interface{}
if err := json.Unmarshal(b, &a); err != nil {
return err
}
p.Timestamp = int64(a[0].(float64) * 1000)
p.Value, _ = strconv.ParseFloat(a[1].(string), 64)
return nil
}
type ShowCreateTableStatement struct {
Statement string `json:"statement" ch:"statement"`
}