mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-14 04:55:55 +08:00
commit
c8d0f7638e
@ -146,7 +146,7 @@ services:
|
||||
condition: on-failure
|
||||
|
||||
query-service:
|
||||
image: signoz/query-service:0.58.1
|
||||
image: signoz/query-service:0.58.2
|
||||
command:
|
||||
[
|
||||
"-config=/root/config/prometheus.yml",
|
||||
@ -186,7 +186,7 @@ services:
|
||||
<<: *db-depend
|
||||
|
||||
frontend:
|
||||
image: signoz/frontend:0.58.1
|
||||
image: signoz/frontend:0.58.2
|
||||
deploy:
|
||||
restart_policy:
|
||||
condition: on-failure
|
||||
|
@ -162,7 +162,7 @@ services:
|
||||
# Notes for Maintainers/Contributors who will change Line Numbers of Frontend & Query-Section. Please Update Line Numbers in `./scripts/commentLinesForSetup.sh` & `./CONTRIBUTING.md`
|
||||
|
||||
query-service:
|
||||
image: signoz/query-service:${DOCKER_TAG:-0.58.1}
|
||||
image: signoz/query-service:${DOCKER_TAG:-0.58.2}
|
||||
container_name: signoz-query-service
|
||||
command:
|
||||
[
|
||||
@ -201,7 +201,7 @@ services:
|
||||
<<: *db-depend
|
||||
|
||||
frontend:
|
||||
image: signoz/frontend:${DOCKER_TAG:-0.58.1}
|
||||
image: signoz/frontend:${DOCKER_TAG:-0.58.2}
|
||||
container_name: signoz-frontend
|
||||
restart: on-failure
|
||||
depends_on:
|
||||
|
@ -167,7 +167,7 @@ services:
|
||||
# Notes for Maintainers/Contributors who will change Line Numbers of Frontend & Query-Section. Please Update Line Numbers in `./scripts/commentLinesForSetup.sh` & `./CONTRIBUTING.md`
|
||||
|
||||
query-service:
|
||||
image: signoz/query-service:${DOCKER_TAG:-0.58.1}
|
||||
image: signoz/query-service:${DOCKER_TAG:-0.58.2}
|
||||
container_name: signoz-query-service
|
||||
command:
|
||||
[
|
||||
@ -208,7 +208,7 @@ services:
|
||||
<<: *db-depend
|
||||
|
||||
frontend:
|
||||
image: signoz/frontend:${DOCKER_TAG:-0.58.1}
|
||||
image: signoz/frontend:${DOCKER_TAG:-0.58.2}
|
||||
container_name: signoz-frontend
|
||||
restart: on-failure
|
||||
depends_on:
|
||||
|
@ -1,5 +1,5 @@
|
||||
# use a minimal alpine image
|
||||
FROM alpine:3.18.6
|
||||
FROM alpine:3.20.3
|
||||
|
||||
# Add Maintainer Info
|
||||
LABEL maintainer="signoz"
|
||||
|
@ -67,6 +67,30 @@ func StartManager(dbType string, db *sqlx.DB, useLicensesV3 bool, features ...ba
|
||||
repo: &repo,
|
||||
}
|
||||
|
||||
if useLicensesV3 {
|
||||
// get active license from the db
|
||||
active, err := m.repo.GetActiveLicense(context.Background())
|
||||
if err != nil {
|
||||
return m, err
|
||||
}
|
||||
|
||||
// if we have an active license then need to fetch the complete details
|
||||
if active != nil {
|
||||
// fetch the new license structure from control plane
|
||||
licenseV3, apiError := validate.ValidateLicenseV3(active.Key)
|
||||
if apiError != nil {
|
||||
return m, apiError
|
||||
}
|
||||
|
||||
// insert the licenseV3 in sqlite db
|
||||
apiError = m.repo.InsertLicenseV3(context.Background(), licenseV3)
|
||||
// if the license already exists move ahead.
|
||||
if apiError != nil && apiError.Typ != model.ErrorConflict {
|
||||
return m, apiError
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := m.start(useLicensesV3, features...); err != nil {
|
||||
return m, err
|
||||
}
|
||||
|
@ -61,6 +61,11 @@ func NewAnomalyRule(
|
||||
|
||||
zap.L().Info("creating new AnomalyRule", zap.String("id", id), zap.Any("opts", opts))
|
||||
|
||||
if p.RuleCondition.CompareOp == baserules.ValueIsBelow {
|
||||
target := -1 * *p.RuleCondition.Target
|
||||
p.RuleCondition.Target = &target
|
||||
}
|
||||
|
||||
baseRule, err := baserules.NewBaseRule(id, p, reader, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -8,7 +8,7 @@ import { ALERTS_DATA_SOURCE_MAP } from 'constants/alerts';
|
||||
import ROUTES from 'constants/routes';
|
||||
import useComponentPermission from 'hooks/useComponentPermission';
|
||||
import useFetch from 'hooks/useFetch';
|
||||
import { useCallback, useEffect, useState } from 'react';
|
||||
import { useCallback, useEffect, useRef, useState } from 'react';
|
||||
import { useTranslation } from 'react-i18next';
|
||||
import { useSelector } from 'react-redux';
|
||||
import { AppState } from 'store/reducers';
|
||||
@ -83,16 +83,22 @@ function BasicInfo({
|
||||
window.open(ROUTES.CHANNELS_NEW, '_blank');
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, []);
|
||||
const hasLoggedEvent = useRef(false);
|
||||
|
||||
useEffect(() => {
|
||||
if (!channels.loading && isNewRule) {
|
||||
if (!channels.loading && isNewRule && !hasLoggedEvent.current) {
|
||||
logEvent('Alert: New alert creation page visited', {
|
||||
dataSource: ALERTS_DATA_SOURCE_MAP[alertDef?.alertType as AlertTypes],
|
||||
numberOfChannels: channels?.payload?.length,
|
||||
});
|
||||
hasLoggedEvent.current = true;
|
||||
}
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [channels.payload, channels.loading]);
|
||||
}, [channels.loading]);
|
||||
|
||||
const refetchChannels = async (): Promise<void> => {
|
||||
await channels.refetch();
|
||||
};
|
||||
|
||||
return (
|
||||
<>
|
||||
@ -197,7 +203,7 @@ function BasicInfo({
|
||||
{!shouldBroadCastToAllChannels && (
|
||||
<Tooltip
|
||||
title={
|
||||
noChannels
|
||||
noChannels && !addNewChannelPermission
|
||||
? 'No channels. Ask an admin to create a notification channel'
|
||||
: undefined
|
||||
}
|
||||
@ -212,10 +218,10 @@ function BasicInfo({
|
||||
]}
|
||||
>
|
||||
<ChannelSelect
|
||||
disabled={
|
||||
shouldBroadCastToAllChannels || noChannels || !!channels.loading
|
||||
}
|
||||
onDropdownOpen={refetchChannels}
|
||||
disabled={shouldBroadCastToAllChannels}
|
||||
currentValue={alertDef.preferredChannels}
|
||||
handleCreateNewChannels={handleCreateNewChannels}
|
||||
channels={channels}
|
||||
onSelectChannels={(preferredChannels): void => {
|
||||
setAlertDef({
|
||||
|
@ -1,24 +1,33 @@
|
||||
import { Select } from 'antd';
|
||||
import { PlusOutlined } from '@ant-design/icons';
|
||||
import { Select, Spin } from 'antd';
|
||||
import useComponentPermission from 'hooks/useComponentPermission';
|
||||
import { State } from 'hooks/useFetch';
|
||||
import { useNotifications } from 'hooks/useNotifications';
|
||||
import { ReactNode } from 'react';
|
||||
import { useTranslation } from 'react-i18next';
|
||||
import { useSelector } from 'react-redux';
|
||||
import { AppState } from 'store/reducers';
|
||||
import { PayloadProps } from 'types/api/channels/getAll';
|
||||
import AppReducer from 'types/reducer/app';
|
||||
|
||||
import { StyledSelect } from './styles';
|
||||
import { StyledCreateChannelOption, StyledSelect } from './styles';
|
||||
|
||||
export interface ChannelSelectProps {
|
||||
disabled?: boolean;
|
||||
currentValue?: string[];
|
||||
onSelectChannels: (s: string[]) => void;
|
||||
onDropdownOpen: () => void;
|
||||
channels: State<PayloadProps | undefined>;
|
||||
handleCreateNewChannels: () => void;
|
||||
}
|
||||
|
||||
function ChannelSelect({
|
||||
disabled,
|
||||
currentValue,
|
||||
onSelectChannels,
|
||||
onDropdownOpen,
|
||||
channels,
|
||||
handleCreateNewChannels,
|
||||
}: ChannelSelectProps): JSX.Element | null {
|
||||
// init namespace for translations
|
||||
const { t } = useTranslation('alerts');
|
||||
@ -26,6 +35,10 @@ function ChannelSelect({
|
||||
const { notifications } = useNotifications();
|
||||
|
||||
const handleChange = (value: string[]): void => {
|
||||
if (value.includes('add-new-channel')) {
|
||||
handleCreateNewChannels();
|
||||
return;
|
||||
}
|
||||
onSelectChannels(value);
|
||||
};
|
||||
|
||||
@ -35,9 +48,27 @@ function ChannelSelect({
|
||||
description: channels.errorMessage,
|
||||
});
|
||||
}
|
||||
|
||||
const { role } = useSelector<AppState, AppReducer>((state) => state.app);
|
||||
const [addNewChannelPermission] = useComponentPermission(
|
||||
['add_new_channel'],
|
||||
role,
|
||||
);
|
||||
|
||||
const renderOptions = (): ReactNode[] => {
|
||||
const children: ReactNode[] = [];
|
||||
|
||||
if (!channels.loading && addNewChannelPermission) {
|
||||
children.push(
|
||||
<Select.Option key="add-new-channel" value="add-new-channel">
|
||||
<StyledCreateChannelOption>
|
||||
<PlusOutlined />
|
||||
Create a new channel
|
||||
</StyledCreateChannelOption>
|
||||
</Select.Option>,
|
||||
);
|
||||
}
|
||||
|
||||
if (
|
||||
channels.loading ||
|
||||
channels.payload === undefined ||
|
||||
@ -56,6 +87,7 @@ function ChannelSelect({
|
||||
|
||||
return children;
|
||||
};
|
||||
|
||||
return (
|
||||
<StyledSelect
|
||||
disabled={disabled}
|
||||
@ -65,6 +97,12 @@ function ChannelSelect({
|
||||
placeholder={t('placeholder_channel_select')}
|
||||
data-testid="alert-channel-select"
|
||||
value={currentValue}
|
||||
notFoundContent={channels.loading && <Spin size="small" />}
|
||||
onDropdownVisibleChange={(open): void => {
|
||||
if (open) {
|
||||
onDropdownOpen();
|
||||
}
|
||||
}}
|
||||
onChange={(value): void => {
|
||||
handleChange(value as string[]);
|
||||
}}
|
||||
|
@ -4,3 +4,10 @@ import styled from 'styled-components';
|
||||
export const StyledSelect = styled(Select)`
|
||||
border-radius: 4px;
|
||||
`;
|
||||
|
||||
export const StyledCreateChannelOption = styled.div`
|
||||
color: var(--bg-robin-500);
|
||||
display: flex;
|
||||
align-items: center;
|
||||
gap: 8px;
|
||||
`;
|
||||
|
@ -102,9 +102,9 @@ function RuleOptions({
|
||||
<Select.Option value="4">{t('option_notequal')}</Select.Option>
|
||||
</>
|
||||
)}
|
||||
|
||||
{/* the value 5 and 6 are reserved for above or equal and below or equal */}
|
||||
{ruleType === 'anomaly_rule' && (
|
||||
<Select.Option value="5">{t('option_above_below')}</Select.Option>
|
||||
<Select.Option value="7">{t('option_above_below')}</Select.Option>
|
||||
)}
|
||||
</InlineSelect>
|
||||
);
|
||||
|
@ -38,11 +38,12 @@ export enum FORMULA {
|
||||
// query C => durationNano <= 2000ms
|
||||
// Since <= 2000ms includes <= 500ms, we over count, to correct we subtract B/2
|
||||
// so the full expression would be (B + C/2) - B/2 = (B+C)/2
|
||||
// However, if you add a filter on durationNano > 500ms, (filterItemC in overviewQueries) the query would be
|
||||
// B + C/2
|
||||
APDEX_TRACES = '((B + C)/2)/A',
|
||||
// Does the same not apply for delta span metrics?
|
||||
// No, because the delta metrics store the counts just for the current bucket
|
||||
// so we don't need to subtract anything
|
||||
APDEX_DELTA_SPAN_METRICS = '(B + C)/A',
|
||||
// The delta span metrics store delta compared to previous reporting interval
|
||||
// but not the counts for the current interval. The bucket counts are cumulative
|
||||
APDEX_DELTA_SPAN_METRICS = '((B + C)/2)/A',
|
||||
// Cumulative span metrics store the counts for all buckets
|
||||
// so we need to subtract B/2 to correct the over counting
|
||||
APDEX_CUMULATIVE_SPAN_METRICS = '((B + C)/2)/A',
|
||||
|
@ -1,4 +1,4 @@
|
||||
import { useEffect, useRef, useState } from 'react';
|
||||
import { useCallback, useEffect, useState } from 'react';
|
||||
import { ErrorResponse, SuccessResponse } from 'types/api';
|
||||
|
||||
function useFetch<PayloadProps, FunctionParams>(
|
||||
@ -10,7 +10,7 @@ function useFetch<PayloadProps, FunctionParams>(
|
||||
(arg0: any): Promise<SuccessResponse<PayloadProps> | ErrorResponse>;
|
||||
},
|
||||
param?: FunctionParams,
|
||||
): State<PayloadProps | undefined> {
|
||||
): State<PayloadProps | undefined> & { refetch: () => Promise<void> } {
|
||||
const [state, setStates] = useState<State<PayloadProps | undefined>>({
|
||||
loading: true,
|
||||
success: null,
|
||||
@ -19,37 +19,28 @@ function useFetch<PayloadProps, FunctionParams>(
|
||||
payload: undefined,
|
||||
});
|
||||
|
||||
const loadingRef = useRef(0);
|
||||
|
||||
useEffect(() => {
|
||||
const fetchData = useCallback(async (): Promise<void> => {
|
||||
setStates((prev) => ({ ...prev, loading: true }));
|
||||
try {
|
||||
(async (): Promise<void> => {
|
||||
if (state.loading) {
|
||||
const response = await functions(param);
|
||||
const response = await functions(param);
|
||||
|
||||
if (loadingRef.current === 0) {
|
||||
loadingRef.current = 1;
|
||||
|
||||
if (response.statusCode === 200) {
|
||||
setStates({
|
||||
loading: false,
|
||||
error: false,
|
||||
success: true,
|
||||
payload: response.payload,
|
||||
errorMessage: '',
|
||||
});
|
||||
} else {
|
||||
setStates({
|
||||
loading: false,
|
||||
error: true,
|
||||
success: false,
|
||||
payload: undefined,
|
||||
errorMessage: response.error as string,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
})();
|
||||
if (response.statusCode === 200) {
|
||||
setStates({
|
||||
loading: false,
|
||||
error: false,
|
||||
success: true,
|
||||
payload: response.payload,
|
||||
errorMessage: '',
|
||||
});
|
||||
} else {
|
||||
setStates({
|
||||
loading: false,
|
||||
error: true,
|
||||
success: false,
|
||||
payload: undefined,
|
||||
errorMessage: response.error as string,
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
setStates({
|
||||
payload: undefined,
|
||||
@ -59,13 +50,16 @@ function useFetch<PayloadProps, FunctionParams>(
|
||||
errorMessage: error as string,
|
||||
});
|
||||
}
|
||||
return (): void => {
|
||||
loadingRef.current = 1;
|
||||
};
|
||||
}, [functions, param, state.loading]);
|
||||
}, [functions, param]);
|
||||
|
||||
// Initial fetch
|
||||
useEffect(() => {
|
||||
fetchData();
|
||||
}, [fetchData]);
|
||||
|
||||
return {
|
||||
...state,
|
||||
refetch: fetchData,
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -766,307 +766,6 @@ func buildFilterArrayQuery(_ context.Context, excludeMap map[string]struct{}, pa
|
||||
return args
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *model.SpanFilterParams) (*model.SpanFiltersResponse, *model.ApiError) {
|
||||
|
||||
var query string
|
||||
excludeMap := make(map[string]struct{})
|
||||
for _, e := range queryParams.Exclude {
|
||||
if e == constants.OperationRequest {
|
||||
excludeMap[constants.OperationDB] = struct{}{}
|
||||
continue
|
||||
}
|
||||
excludeMap[e] = struct{}{}
|
||||
}
|
||||
|
||||
args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))}
|
||||
if len(queryParams.TraceID) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.TraceID, constants.TraceID, &query, args)
|
||||
}
|
||||
if len(queryParams.ServiceName) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpRoute) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpHost) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpMethod) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpUrl) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args)
|
||||
}
|
||||
if len(queryParams.Operation) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args)
|
||||
}
|
||||
if len(queryParams.RPCMethod) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.RPCMethod, constants.RPCMethod, &query, args)
|
||||
}
|
||||
if len(queryParams.ResponseStatusCode) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ResponseStatusCode, constants.ResponseStatusCode, &query, args)
|
||||
}
|
||||
|
||||
if len(queryParams.MinDuration) != 0 {
|
||||
query = query + " AND durationNano >= @durationNanoMin"
|
||||
args = append(args, clickhouse.Named("durationNanoMin", queryParams.MinDuration))
|
||||
}
|
||||
if len(queryParams.MaxDuration) != 0 {
|
||||
query = query + " AND durationNano <= @durationNanoMax"
|
||||
args = append(args, clickhouse.Named("durationNanoMax", queryParams.MaxDuration))
|
||||
}
|
||||
|
||||
if len(queryParams.SpanKind) != 0 {
|
||||
query = query + " AND kind = @kind"
|
||||
args = append(args, clickhouse.Named("kind", queryParams.SpanKind))
|
||||
}
|
||||
|
||||
query = getStatusFilters(query, queryParams.Status, excludeMap)
|
||||
|
||||
traceFilterReponse := model.SpanFiltersResponse{
|
||||
Status: map[string]uint64{},
|
||||
Duration: map[string]uint64{},
|
||||
ServiceName: map[string]uint64{},
|
||||
Operation: map[string]uint64{},
|
||||
ResponseStatusCode: map[string]uint64{},
|
||||
RPCMethod: map[string]uint64{},
|
||||
HttpMethod: map[string]uint64{},
|
||||
HttpUrl: map[string]uint64{},
|
||||
HttpRoute: map[string]uint64{},
|
||||
HttpHost: map[string]uint64{},
|
||||
}
|
||||
|
||||
for _, e := range queryParams.GetFilters {
|
||||
switch e {
|
||||
case constants.TraceID:
|
||||
continue
|
||||
case constants.ServiceName:
|
||||
finalQuery := fmt.Sprintf("SELECT serviceName, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
||||
finalQuery += query
|
||||
finalQuery += " GROUP BY serviceName"
|
||||
var dBResponse []model.DBResponseServiceName
|
||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
||||
zap.L().Info(finalQuery)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
for _, service := range dBResponse {
|
||||
if service.ServiceName != "" {
|
||||
traceFilterReponse.ServiceName[service.ServiceName] = service.Count
|
||||
}
|
||||
}
|
||||
case constants.HttpRoute:
|
||||
finalQuery := fmt.Sprintf("SELECT httpRoute, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
||||
finalQuery += query
|
||||
finalQuery += " GROUP BY httpRoute"
|
||||
var dBResponse []model.DBResponseHttpRoute
|
||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
||||
zap.L().Info(finalQuery)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
for _, service := range dBResponse {
|
||||
if service.HttpRoute != "" {
|
||||
traceFilterReponse.HttpRoute[service.HttpRoute] = service.Count
|
||||
}
|
||||
}
|
||||
case constants.HttpUrl:
|
||||
finalQuery := fmt.Sprintf("SELECT httpUrl, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
||||
finalQuery += query
|
||||
finalQuery += " GROUP BY httpUrl"
|
||||
var dBResponse []model.DBResponseHttpUrl
|
||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
||||
zap.L().Info(finalQuery)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
for _, service := range dBResponse {
|
||||
if service.HttpUrl != "" {
|
||||
traceFilterReponse.HttpUrl[service.HttpUrl] = service.Count
|
||||
}
|
||||
}
|
||||
case constants.HttpMethod:
|
||||
finalQuery := fmt.Sprintf("SELECT httpMethod, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
||||
finalQuery += query
|
||||
finalQuery += " GROUP BY httpMethod"
|
||||
var dBResponse []model.DBResponseHttpMethod
|
||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
||||
zap.L().Info(finalQuery)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
for _, service := range dBResponse {
|
||||
if service.HttpMethod != "" {
|
||||
traceFilterReponse.HttpMethod[service.HttpMethod] = service.Count
|
||||
}
|
||||
}
|
||||
case constants.HttpHost:
|
||||
finalQuery := fmt.Sprintf("SELECT httpHost, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
||||
finalQuery += query
|
||||
finalQuery += " GROUP BY httpHost"
|
||||
var dBResponse []model.DBResponseHttpHost
|
||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
||||
zap.L().Info(finalQuery)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
for _, service := range dBResponse {
|
||||
if service.HttpHost != "" {
|
||||
traceFilterReponse.HttpHost[service.HttpHost] = service.Count
|
||||
}
|
||||
}
|
||||
case constants.OperationRequest:
|
||||
finalQuery := fmt.Sprintf("SELECT name, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
||||
finalQuery += query
|
||||
finalQuery += " GROUP BY name"
|
||||
var dBResponse []model.DBResponseOperation
|
||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
||||
zap.L().Info(finalQuery)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
for _, service := range dBResponse {
|
||||
if service.Operation != "" {
|
||||
traceFilterReponse.Operation[service.Operation] = service.Count
|
||||
}
|
||||
}
|
||||
case constants.Status:
|
||||
finalQuery := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = true", r.TraceDB, r.indexTable)
|
||||
finalQuery += query
|
||||
var dBResponse []model.DBResponseTotal
|
||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
||||
zap.L().Info(finalQuery)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
|
||||
finalQuery2 := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = false", r.TraceDB, r.indexTable)
|
||||
finalQuery2 += query
|
||||
var dBResponse2 []model.DBResponseTotal
|
||||
err = r.db.Select(ctx, &dBResponse2, finalQuery2, args...)
|
||||
zap.L().Info(finalQuery2)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
if len(dBResponse) > 0 && len(dBResponse2) > 0 {
|
||||
traceFilterReponse.Status = map[string]uint64{"ok": dBResponse2[0].NumTotal, "error": dBResponse[0].NumTotal}
|
||||
} else if len(dBResponse) > 0 {
|
||||
traceFilterReponse.Status = map[string]uint64{"ok": 0, "error": dBResponse[0].NumTotal}
|
||||
} else if len(dBResponse2) > 0 {
|
||||
traceFilterReponse.Status = map[string]uint64{"ok": dBResponse2[0].NumTotal, "error": 0}
|
||||
} else {
|
||||
traceFilterReponse.Status = map[string]uint64{"ok": 0, "error": 0}
|
||||
}
|
||||
case constants.Duration:
|
||||
err := r.featureFlags.CheckFeature(constants.DurationSort)
|
||||
durationSortEnabled := err == nil
|
||||
finalQuery := ""
|
||||
if !durationSortEnabled {
|
||||
// if duration sort is not enabled, we need to get the min and max duration from the index table
|
||||
finalQuery = fmt.Sprintf("SELECT min(durationNano) as min, max(durationNano) as max FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
||||
finalQuery += query
|
||||
var dBResponse []model.DBResponseMinMax
|
||||
err = r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
||||
zap.L().Info(finalQuery)
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
if len(dBResponse) > 0 {
|
||||
traceFilterReponse.Duration = map[string]uint64{"minDuration": dBResponse[0].Min, "maxDuration": dBResponse[0].Max}
|
||||
}
|
||||
} else {
|
||||
// when duration sort is enabled, we need to get the min and max duration from the duration table
|
||||
finalQuery = fmt.Sprintf("SELECT durationNano as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.durationTable)
|
||||
finalQuery += query
|
||||
finalQuery += " ORDER BY durationNano LIMIT 1"
|
||||
var dBResponse []model.DBResponseTotal
|
||||
err = r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
||||
zap.L().Info(finalQuery)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
|
||||
finalQuery = fmt.Sprintf("SELECT durationNano as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.durationTable)
|
||||
finalQuery += query
|
||||
finalQuery += " ORDER BY durationNano DESC LIMIT 1"
|
||||
var dBResponse2 []model.DBResponseTotal
|
||||
err = r.db.Select(ctx, &dBResponse2, finalQuery, args...)
|
||||
zap.L().Info(finalQuery)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
if len(dBResponse) > 0 {
|
||||
traceFilterReponse.Duration["minDuration"] = dBResponse[0].NumTotal
|
||||
}
|
||||
if len(dBResponse2) > 0 {
|
||||
traceFilterReponse.Duration["maxDuration"] = dBResponse2[0].NumTotal
|
||||
}
|
||||
}
|
||||
case constants.RPCMethod:
|
||||
finalQuery := fmt.Sprintf("SELECT rpcMethod, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
||||
finalQuery += query
|
||||
finalQuery += " GROUP BY rpcMethod"
|
||||
var dBResponse []model.DBResponseRPCMethod
|
||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
||||
zap.L().Info(finalQuery)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
for _, service := range dBResponse {
|
||||
if service.RPCMethod != "" {
|
||||
traceFilterReponse.RPCMethod[service.RPCMethod] = service.Count
|
||||
}
|
||||
}
|
||||
|
||||
case constants.ResponseStatusCode:
|
||||
finalQuery := fmt.Sprintf("SELECT responseStatusCode, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
||||
finalQuery += query
|
||||
finalQuery += " GROUP BY responseStatusCode"
|
||||
var dBResponse []model.DBResponseStatusCodeMethod
|
||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
||||
zap.L().Info(finalQuery)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
for _, service := range dBResponse {
|
||||
if service.ResponseStatusCode != "" {
|
||||
traceFilterReponse.ResponseStatusCode[service.ResponseStatusCode] = service.Count
|
||||
}
|
||||
}
|
||||
|
||||
default:
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("filter type: %s not supported", e)}
|
||||
}
|
||||
}
|
||||
|
||||
return &traceFilterReponse, nil
|
||||
}
|
||||
|
||||
func getStatusFilters(query string, statusParams []string, excludeMap map[string]struct{}) string {
|
||||
|
||||
// status can only be two and if both are selected than they are equivalent to none selected
|
||||
@ -1088,140 +787,6 @@ func getStatusFilters(query string, statusParams []string, excludeMap map[string
|
||||
return query
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetFilteredSpans(ctx context.Context, queryParams *model.GetFilteredSpansParams) (*model.GetFilterSpansResponse, *model.ApiError) {
|
||||
|
||||
queryTable := fmt.Sprintf("%s.%s", r.TraceDB, r.indexTable)
|
||||
|
||||
excludeMap := make(map[string]struct{})
|
||||
for _, e := range queryParams.Exclude {
|
||||
if e == constants.OperationRequest {
|
||||
excludeMap[constants.OperationDB] = struct{}{}
|
||||
continue
|
||||
}
|
||||
excludeMap[e] = struct{}{}
|
||||
}
|
||||
|
||||
var query string
|
||||
args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))}
|
||||
if len(queryParams.TraceID) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.TraceID, constants.TraceID, &query, args)
|
||||
}
|
||||
if len(queryParams.ServiceName) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpRoute) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpHost) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpMethod) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpUrl) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args)
|
||||
}
|
||||
if len(queryParams.Operation) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args)
|
||||
}
|
||||
if len(queryParams.RPCMethod) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.RPCMethod, constants.RPCMethod, &query, args)
|
||||
}
|
||||
|
||||
if len(queryParams.ResponseStatusCode) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ResponseStatusCode, constants.ResponseStatusCode, &query, args)
|
||||
}
|
||||
|
||||
if len(queryParams.MinDuration) != 0 {
|
||||
query = query + " AND durationNano >= @durationNanoMin"
|
||||
args = append(args, clickhouse.Named("durationNanoMin", queryParams.MinDuration))
|
||||
}
|
||||
if len(queryParams.MaxDuration) != 0 {
|
||||
query = query + " AND durationNano <= @durationNanoMax"
|
||||
args = append(args, clickhouse.Named("durationNanoMax", queryParams.MaxDuration))
|
||||
}
|
||||
query = getStatusFilters(query, queryParams.Status, excludeMap)
|
||||
|
||||
if len(queryParams.SpanKind) != 0 {
|
||||
query = query + " AND kind = @kind"
|
||||
args = append(args, clickhouse.Named("kind", queryParams.SpanKind))
|
||||
}
|
||||
|
||||
// create TagQuery from TagQueryParams
|
||||
tags := createTagQueryFromTagQueryParams(queryParams.Tags)
|
||||
subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags)
|
||||
query += subQuery
|
||||
args = append(args, argsSubQuery...)
|
||||
if errStatus != nil {
|
||||
return nil, errStatus
|
||||
}
|
||||
|
||||
if len(queryParams.OrderParam) != 0 {
|
||||
if queryParams.OrderParam == constants.Duration {
|
||||
queryTable = fmt.Sprintf("%s.%s", r.TraceDB, r.durationTable)
|
||||
if queryParams.Order == constants.Descending {
|
||||
query = query + " ORDER BY durationNano DESC"
|
||||
}
|
||||
if queryParams.Order == constants.Ascending {
|
||||
query = query + " ORDER BY durationNano ASC"
|
||||
}
|
||||
} else if queryParams.OrderParam == constants.Timestamp {
|
||||
projectionOptQuery := "SET allow_experimental_projection_optimization = 1"
|
||||
err := r.db.Exec(ctx, projectionOptQuery)
|
||||
|
||||
zap.L().Info(projectionOptQuery)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
||||
}
|
||||
if queryParams.Order == constants.Descending {
|
||||
query = query + " ORDER BY timestamp DESC"
|
||||
}
|
||||
if queryParams.Order == constants.Ascending {
|
||||
query = query + " ORDER BY timestamp ASC"
|
||||
}
|
||||
}
|
||||
}
|
||||
if queryParams.Limit > 0 {
|
||||
query = query + " LIMIT @limit"
|
||||
args = append(args, clickhouse.Named("limit", queryParams.Limit))
|
||||
}
|
||||
|
||||
if queryParams.Offset > 0 {
|
||||
query = query + " OFFSET @offset"
|
||||
args = append(args, clickhouse.Named("offset", queryParams.Offset))
|
||||
}
|
||||
|
||||
var getFilterSpansResponseItems []model.GetFilterSpansResponseItem
|
||||
|
||||
baseQuery := fmt.Sprintf("SELECT timestamp, spanID, traceID, serviceName, name, durationNano, httpMethod, rpcMethod, responseStatusCode FROM %s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryTable)
|
||||
baseQuery += query
|
||||
err := r.db.Select(ctx, &getFilterSpansResponseItems, baseQuery, args...)
|
||||
// Fill status and method
|
||||
for i, e := range getFilterSpansResponseItems {
|
||||
if e.RPCMethod != "" {
|
||||
getFilterSpansResponseItems[i].Method = e.RPCMethod
|
||||
} else {
|
||||
getFilterSpansResponseItems[i].Method = e.HttpMethod
|
||||
}
|
||||
}
|
||||
|
||||
zap.L().Info(baseQuery)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
||||
}
|
||||
|
||||
getFilterSpansResponse := model.GetFilterSpansResponse{
|
||||
Spans: getFilterSpansResponseItems,
|
||||
TotalSpans: 1000,
|
||||
}
|
||||
|
||||
return &getFilterSpansResponse, nil
|
||||
}
|
||||
|
||||
func createTagQueryFromTagQueryParams(queryParams []model.TagQueryParam) []model.TagQuery {
|
||||
tags := []model.TagQuery{}
|
||||
for _, tag := range queryParams {
|
||||
@ -1379,87 +944,6 @@ func addExistsOperator(item model.TagQuery, tagMapType string, not bool) (string
|
||||
return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagOperatorPair, " OR ")), args
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetTagFilters(ctx context.Context, queryParams *model.TagFilterParams) (*model.TagFilters, *model.ApiError) {
|
||||
|
||||
excludeMap := make(map[string]struct{})
|
||||
for _, e := range queryParams.Exclude {
|
||||
if e == constants.OperationRequest {
|
||||
excludeMap[constants.OperationDB] = struct{}{}
|
||||
continue
|
||||
}
|
||||
excludeMap[e] = struct{}{}
|
||||
}
|
||||
|
||||
var query string
|
||||
args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))}
|
||||
if len(queryParams.TraceID) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.TraceID, constants.TraceID, &query, args)
|
||||
}
|
||||
if len(queryParams.ServiceName) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpRoute) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpHost) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpMethod) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpUrl) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args)
|
||||
}
|
||||
if len(queryParams.Operation) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args)
|
||||
}
|
||||
if len(queryParams.RPCMethod) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.RPCMethod, constants.RPCMethod, &query, args)
|
||||
}
|
||||
if len(queryParams.ResponseStatusCode) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ResponseStatusCode, constants.ResponseStatusCode, &query, args)
|
||||
}
|
||||
if len(queryParams.MinDuration) != 0 {
|
||||
query = query + " AND durationNano >= @durationNanoMin"
|
||||
args = append(args, clickhouse.Named("durationNanoMin", queryParams.MinDuration))
|
||||
}
|
||||
if len(queryParams.MaxDuration) != 0 {
|
||||
query = query + " AND durationNano <= @durationNanoMax"
|
||||
args = append(args, clickhouse.Named("durationNanoMax", queryParams.MaxDuration))
|
||||
}
|
||||
if len(queryParams.SpanKind) != 0 {
|
||||
query = query + " AND kind = @kind"
|
||||
args = append(args, clickhouse.Named("kind", queryParams.SpanKind))
|
||||
}
|
||||
|
||||
query = getStatusFilters(query, queryParams.Status, excludeMap)
|
||||
|
||||
tagFilters := []model.TagFilters{}
|
||||
|
||||
// Alternative finalQuery := fmt.Sprintf(`SELECT DISTINCT arrayJoin(tagMap.keys) as tagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable)
|
||||
finalQuery := fmt.Sprintf(`SELECT groupUniqArrayArray(mapKeys(stringTagMap)) as stringTagKeys, groupUniqArrayArray(mapKeys(numberTagMap)) as numberTagKeys, groupUniqArrayArray(mapKeys(boolTagMap)) as boolTagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable)
|
||||
finalQuery += query
|
||||
err := r.db.Select(ctx, &tagFilters, finalQuery, args...)
|
||||
|
||||
zap.L().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
||||
}
|
||||
tagFiltersResult := model.TagFilters{
|
||||
StringTagKeys: make([]string, 0),
|
||||
NumberTagKeys: make([]string, 0),
|
||||
BoolTagKeys: make([]string, 0),
|
||||
}
|
||||
if len(tagFilters) != 0 {
|
||||
tagFiltersResult.StringTagKeys = excludeTags(ctx, tagFilters[0].StringTagKeys)
|
||||
tagFiltersResult.NumberTagKeys = excludeTags(ctx, tagFilters[0].NumberTagKeys)
|
||||
tagFiltersResult.BoolTagKeys = excludeTags(ctx, tagFilters[0].BoolTagKeys)
|
||||
}
|
||||
return &tagFiltersResult, nil
|
||||
}
|
||||
|
||||
func excludeTags(_ context.Context, tags []string) []string {
|
||||
excludedTagsMap := map[string]bool{
|
||||
"http.code": true,
|
||||
@ -1483,102 +967,6 @@ func excludeTags(_ context.Context, tags []string) []string {
|
||||
return newTags
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetTagValues(ctx context.Context, queryParams *model.TagFilterParams) (*model.TagValues, *model.ApiError) {
|
||||
|
||||
if queryParams.TagKey.Type == model.TagTypeNumber {
|
||||
return &model.TagValues{
|
||||
NumberTagValues: make([]float64, 0),
|
||||
StringTagValues: make([]string, 0),
|
||||
BoolTagValues: make([]bool, 0),
|
||||
}, nil
|
||||
} else if queryParams.TagKey.Type == model.TagTypeBool {
|
||||
return &model.TagValues{
|
||||
NumberTagValues: make([]float64, 0),
|
||||
StringTagValues: make([]string, 0),
|
||||
BoolTagValues: []bool{true, false},
|
||||
}, nil
|
||||
}
|
||||
|
||||
excludeMap := make(map[string]struct{})
|
||||
for _, e := range queryParams.Exclude {
|
||||
if e == constants.OperationRequest {
|
||||
excludeMap[constants.OperationDB] = struct{}{}
|
||||
continue
|
||||
}
|
||||
excludeMap[e] = struct{}{}
|
||||
}
|
||||
|
||||
var query string
|
||||
args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))}
|
||||
if len(queryParams.TraceID) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.TraceID, constants.TraceID, &query, args)
|
||||
}
|
||||
if len(queryParams.ServiceName) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpRoute) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpHost) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpMethod) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpUrl) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args)
|
||||
}
|
||||
if len(queryParams.Operation) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args)
|
||||
}
|
||||
if len(queryParams.MinDuration) != 0 {
|
||||
query = query + " AND durationNano >= @durationNanoMin"
|
||||
args = append(args, clickhouse.Named("durationNanoMin", queryParams.MinDuration))
|
||||
}
|
||||
if len(queryParams.MaxDuration) != 0 {
|
||||
query = query + " AND durationNano <= @durationNanoMax"
|
||||
args = append(args, clickhouse.Named("durationNanoMax", queryParams.MaxDuration))
|
||||
}
|
||||
if len(queryParams.SpanKind) != 0 {
|
||||
query = query + " AND kind = @kind"
|
||||
args = append(args, clickhouse.Named("kind", queryParams.SpanKind))
|
||||
}
|
||||
|
||||
query = getStatusFilters(query, queryParams.Status, excludeMap)
|
||||
|
||||
tagValues := []model.TagValues{}
|
||||
|
||||
finalQuery := fmt.Sprintf(`SELECT groupArray(DISTINCT stringTagMap[@key]) as stringTagValues FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable)
|
||||
finalQuery += query
|
||||
finalQuery += " LIMIT @limit"
|
||||
|
||||
args = append(args, clickhouse.Named("key", queryParams.TagKey.Key))
|
||||
args = append(args, clickhouse.Named("limit", queryParams.Limit))
|
||||
err := r.db.Select(ctx, &tagValues, finalQuery, args...)
|
||||
|
||||
zap.L().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
||||
}
|
||||
|
||||
cleanedTagValues := model.TagValues{
|
||||
StringTagValues: []string{},
|
||||
NumberTagValues: []float64{},
|
||||
BoolTagValues: []bool{},
|
||||
}
|
||||
if len(tagValues) == 0 {
|
||||
return &cleanedTagValues, nil
|
||||
}
|
||||
for _, e := range tagValues[0].StringTagValues {
|
||||
if e != "" {
|
||||
cleanedTagValues.StringTagValues = append(cleanedTagValues.StringTagValues, e)
|
||||
}
|
||||
}
|
||||
return &cleanedTagValues, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *model.GetTopOperationsParams) (*[]model.TopOperationsItem, *model.ApiError) {
|
||||
|
||||
namedArgs := []interface{}{
|
||||
@ -1823,185 +1211,6 @@ func (r *ClickHouseReader) GetDependencyGraph(ctx context.Context, queryParams *
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, queryParams *model.GetFilteredSpanAggregatesParams) (*model.GetFilteredSpansAggregatesResponse, *model.ApiError) {
|
||||
|
||||
excludeMap := make(map[string]struct{})
|
||||
for _, e := range queryParams.Exclude {
|
||||
if e == constants.OperationRequest {
|
||||
excludeMap[constants.OperationDB] = struct{}{}
|
||||
continue
|
||||
}
|
||||
excludeMap[e] = struct{}{}
|
||||
}
|
||||
|
||||
SpanAggregatesDBResponseItems := []model.SpanAggregatesDBResponseItem{}
|
||||
|
||||
aggregation_query := ""
|
||||
if queryParams.Dimension == "duration" {
|
||||
switch queryParams.AggregationOption {
|
||||
case "p50":
|
||||
aggregation_query = " quantile(0.50)(durationNano) as float64Value "
|
||||
case "p95":
|
||||
aggregation_query = " quantile(0.95)(durationNano) as float64Value "
|
||||
case "p90":
|
||||
aggregation_query = " quantile(0.90)(durationNano) as float64Value "
|
||||
case "p99":
|
||||
aggregation_query = " quantile(0.99)(durationNano) as float64Value "
|
||||
case "max":
|
||||
aggregation_query = " max(durationNano) as value "
|
||||
case "min":
|
||||
aggregation_query = " min(durationNano) as value "
|
||||
case "avg":
|
||||
aggregation_query = " avg(durationNano) as float64Value "
|
||||
case "sum":
|
||||
aggregation_query = " sum(durationNano) as value "
|
||||
default:
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("aggregate type: %s not supported", queryParams.AggregationOption)}
|
||||
}
|
||||
} else if queryParams.Dimension == "calls" {
|
||||
aggregation_query = " count(*) as value "
|
||||
}
|
||||
|
||||
args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))}
|
||||
|
||||
var query string
|
||||
var customStr []string
|
||||
_, columnExists := constants.GroupByColMap[queryParams.GroupBy]
|
||||
// Using %s for groupBy params as it can be a custom column and custom columns are not supported by clickhouse-go yet:
|
||||
// issue link: https://github.com/ClickHouse/clickhouse-go/issues/870
|
||||
if queryParams.GroupBy != "" && columnExists {
|
||||
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, queryParams.GroupBy, aggregation_query, r.TraceDB, r.indexTable)
|
||||
args = append(args, clickhouse.Named("groupByVar", queryParams.GroupBy))
|
||||
} else if queryParams.GroupBy != "" {
|
||||
customStr = strings.Split(queryParams.GroupBy, ".(")
|
||||
if len(customStr) < 2 {
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("GroupBy: %s not supported", queryParams.GroupBy)}
|
||||
}
|
||||
if customStr[1] == string(model.TagTypeString)+")" {
|
||||
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, stringTagMap['%s'] as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable)
|
||||
} else if customStr[1] == string(model.TagTypeNumber)+")" {
|
||||
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, toString(numberTagMap['%s']) as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable)
|
||||
} else if customStr[1] == string(model.TagTypeBool)+")" {
|
||||
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, toString(boolTagMap['%s']) as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable)
|
||||
} else {
|
||||
// return error for unsupported group by
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("GroupBy: %s not supported", queryParams.GroupBy)}
|
||||
}
|
||||
} else {
|
||||
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable)
|
||||
}
|
||||
|
||||
if len(queryParams.TraceID) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.TraceID, constants.TraceID, &query, args)
|
||||
}
|
||||
if len(queryParams.ServiceName) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpRoute) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpHost) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpMethod) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpUrl) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args)
|
||||
}
|
||||
if len(queryParams.Operation) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args)
|
||||
}
|
||||
if len(queryParams.RPCMethod) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.RPCMethod, constants.RPCMethod, &query, args)
|
||||
}
|
||||
if len(queryParams.ResponseStatusCode) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ResponseStatusCode, constants.ResponseStatusCode, &query, args)
|
||||
}
|
||||
if len(queryParams.MinDuration) != 0 {
|
||||
query = query + " AND durationNano >= @durationNanoMin"
|
||||
args = append(args, clickhouse.Named("durationNanoMin", queryParams.MinDuration))
|
||||
}
|
||||
if len(queryParams.MaxDuration) != 0 {
|
||||
query = query + " AND durationNano <= @durationNanoMax"
|
||||
args = append(args, clickhouse.Named("durationNanoMax", queryParams.MaxDuration))
|
||||
}
|
||||
query = getStatusFilters(query, queryParams.Status, excludeMap)
|
||||
|
||||
if len(queryParams.SpanKind) != 0 {
|
||||
query = query + " AND kind = @kind"
|
||||
args = append(args, clickhouse.Named("kind", queryParams.SpanKind))
|
||||
}
|
||||
// create TagQuery from TagQueryParams
|
||||
tags := createTagQueryFromTagQueryParams(queryParams.Tags)
|
||||
subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags)
|
||||
query += subQuery
|
||||
args = append(args, argsSubQuery...)
|
||||
|
||||
if errStatus != nil {
|
||||
return nil, errStatus
|
||||
}
|
||||
|
||||
if queryParams.GroupBy != "" && columnExists {
|
||||
query = query + fmt.Sprintf(" GROUP BY time, %s as groupBy ORDER BY time", queryParams.GroupBy)
|
||||
} else if queryParams.GroupBy != "" {
|
||||
if customStr[1] == string(model.TagTypeString)+")" {
|
||||
query = query + fmt.Sprintf(" GROUP BY time, stringTagMap['%s'] as groupBy ORDER BY time", customStr[0])
|
||||
} else if customStr[1] == string(model.TagTypeNumber)+")" {
|
||||
query = query + fmt.Sprintf(" GROUP BY time, toString(numberTagMap['%s']) as groupBy ORDER BY time", customStr[0])
|
||||
} else if customStr[1] == string(model.TagTypeBool)+")" {
|
||||
query = query + fmt.Sprintf(" GROUP BY time, toString(boolTagMap['%s']) as groupBy ORDER BY time", customStr[0])
|
||||
}
|
||||
} else {
|
||||
query = query + " GROUP BY time ORDER BY time"
|
||||
}
|
||||
|
||||
err := r.db.Select(ctx, &SpanAggregatesDBResponseItems, query, args...)
|
||||
|
||||
zap.L().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
||||
}
|
||||
|
||||
GetFilteredSpansAggregatesResponse := model.GetFilteredSpansAggregatesResponse{
|
||||
Items: map[int64]model.SpanAggregatesResponseItem{},
|
||||
}
|
||||
|
||||
for i := range SpanAggregatesDBResponseItems {
|
||||
if SpanAggregatesDBResponseItems[i].Value == 0 {
|
||||
SpanAggregatesDBResponseItems[i].Value = uint64(SpanAggregatesDBResponseItems[i].Float64Value)
|
||||
}
|
||||
SpanAggregatesDBResponseItems[i].Timestamp = int64(SpanAggregatesDBResponseItems[i].Time.UnixNano())
|
||||
SpanAggregatesDBResponseItems[i].FloatValue = float32(SpanAggregatesDBResponseItems[i].Value)
|
||||
if queryParams.AggregationOption == "rate_per_sec" {
|
||||
SpanAggregatesDBResponseItems[i].FloatValue = float32(SpanAggregatesDBResponseItems[i].Value) / float32(queryParams.StepSeconds)
|
||||
}
|
||||
if responseElement, ok := GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp]; !ok {
|
||||
if queryParams.GroupBy != "" && SpanAggregatesDBResponseItems[i].GroupBy != "" {
|
||||
GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp] = model.SpanAggregatesResponseItem{
|
||||
Timestamp: SpanAggregatesDBResponseItems[i].Timestamp,
|
||||
GroupBy: map[string]float32{SpanAggregatesDBResponseItems[i].GroupBy: SpanAggregatesDBResponseItems[i].FloatValue},
|
||||
}
|
||||
} else if queryParams.GroupBy == "" {
|
||||
GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp] = model.SpanAggregatesResponseItem{
|
||||
Timestamp: SpanAggregatesDBResponseItems[i].Timestamp,
|
||||
Value: SpanAggregatesDBResponseItems[i].FloatValue,
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
if queryParams.GroupBy != "" && SpanAggregatesDBResponseItems[i].GroupBy != "" {
|
||||
responseElement.GroupBy[SpanAggregatesDBResponseItems[i].GroupBy] = SpanAggregatesDBResponseItems[i].FloatValue
|
||||
}
|
||||
GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp] = responseElement
|
||||
}
|
||||
}
|
||||
|
||||
return &GetFilteredSpansAggregatesResponse, nil
|
||||
}
|
||||
|
||||
func getLocalTableName(tableName string) string {
|
||||
|
||||
tableNameSplit := strings.Split(tableName, ".")
|
||||
|
@ -526,12 +526,6 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) {
|
||||
router.HandleFunc("/api/v1/configs", am.OpenAccess(aH.getConfigs)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/health", am.OpenAccess(aH.getHealth)).Methods(http.MethodGet)
|
||||
|
||||
router.HandleFunc("/api/v1/getSpanFilters", am.ViewAccess(aH.getSpanFilters)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/getTagFilters", am.ViewAccess(aH.getTagFilters)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/getFilteredSpans", am.ViewAccess(aH.getFilteredSpans)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/getFilteredSpans/aggregates", am.ViewAccess(aH.getFilteredSpanAggregates)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/getTagValues", am.ViewAccess(aH.getTagValues)).Methods(http.MethodPost)
|
||||
|
||||
router.HandleFunc("/api/v1/listErrors", am.ViewAccess(aH.listErrors)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/countErrors", am.ViewAccess(aH.countErrors)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/errorFromErrorID", am.ViewAccess(aH.getErrorFromErrorID)).Methods(http.MethodGet)
|
||||
@ -1847,86 +1841,6 @@ func (aH *APIHandler) getErrorFromGroupID(w http.ResponseWriter, r *http.Request
|
||||
aH.WriteJSON(w, r, result)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) getSpanFilters(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
query, err := parseSpanFilterRequestBody(r)
|
||||
if aH.HandleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
|
||||
result, apiErr := aH.reader.GetSpanFilters(r.Context(), query)
|
||||
|
||||
if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) {
|
||||
return
|
||||
}
|
||||
|
||||
aH.WriteJSON(w, r, result)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) getFilteredSpans(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
query, err := parseFilteredSpansRequest(r, aH)
|
||||
if aH.HandleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
|
||||
result, apiErr := aH.reader.GetFilteredSpans(r.Context(), query)
|
||||
|
||||
if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) {
|
||||
return
|
||||
}
|
||||
|
||||
aH.WriteJSON(w, r, result)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) getFilteredSpanAggregates(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
query, err := parseFilteredSpanAggregatesRequest(r)
|
||||
if aH.HandleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
|
||||
result, apiErr := aH.reader.GetFilteredSpansAggregates(r.Context(), query)
|
||||
|
||||
if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) {
|
||||
return
|
||||
}
|
||||
|
||||
aH.WriteJSON(w, r, result)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) getTagFilters(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
query, err := parseTagFilterRequest(r)
|
||||
if aH.HandleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
|
||||
result, apiErr := aH.reader.GetTagFilters(r.Context(), query)
|
||||
|
||||
if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) {
|
||||
return
|
||||
}
|
||||
|
||||
aH.WriteJSON(w, r, result)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) getTagValues(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
query, err := parseTagValueRequest(r)
|
||||
if aH.HandleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
|
||||
result, apiErr := aH.reader.GetTagValues(r.Context(), query)
|
||||
|
||||
if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) {
|
||||
return
|
||||
}
|
||||
|
||||
aH.WriteJSON(w, r, result)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) setTTL(w http.ResponseWriter, r *http.Request) {
|
||||
ttlParams, err := parseTTLParams(r)
|
||||
if aH.HandleError(w, err, http.StatusBadRequest) {
|
||||
|
@ -142,7 +142,7 @@ func enrichFieldWithMetadata(field v3.AttributeKey, fields map[string]v3.Attribu
|
||||
}
|
||||
|
||||
// check if the field is present in the fields map
|
||||
for _, key := range utils.GenerateLogEnrichmentKeys(field) {
|
||||
for _, key := range utils.GenerateEnrichmentKeys(field) {
|
||||
if val, ok := fields[key]; ok {
|
||||
return val
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
|
||||
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/common"
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
chErrors "go.signoz.io/signoz/pkg/query-service/errors"
|
||||
"go.signoz.io/signoz/pkg/query-service/querycache"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||
@ -52,7 +53,8 @@ type querier struct {
|
||||
returnedSeries []*v3.Series
|
||||
returnedErr error
|
||||
|
||||
UseLogsNewSchema bool
|
||||
UseLogsNewSchema bool
|
||||
UseTraceNewSchema bool
|
||||
}
|
||||
|
||||
type QuerierOptions struct {
|
||||
@ -308,56 +310,121 @@ func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRang
|
||||
return results, errQueriesByName, err
|
||||
}
|
||||
|
||||
func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) {
|
||||
func (q *querier) runWindowBasedListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) {
|
||||
res := make([]*v3.Result, 0)
|
||||
qName := ""
|
||||
pageSize := uint64(0)
|
||||
limit := uint64(0)
|
||||
offset := uint64(0)
|
||||
|
||||
// se we are considering only one query
|
||||
for name, v := range params.CompositeQuery.BuilderQueries {
|
||||
qName = name
|
||||
pageSize = v.PageSize
|
||||
|
||||
// for traces specifically
|
||||
limit = v.Limit
|
||||
offset = v.Offset
|
||||
}
|
||||
data := []*v3.Row{}
|
||||
|
||||
tracesLimit := limit + offset
|
||||
|
||||
for _, v := range tsRanges {
|
||||
params.Start = v.Start
|
||||
params.End = v.End
|
||||
|
||||
params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data))
|
||||
queries, err := q.builder.PrepareQueries(params)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
length := uint64(0)
|
||||
// this will to run only once
|
||||
for name, query := range queries {
|
||||
rowList, err := q.reader.GetListResultV3(ctx, query)
|
||||
|
||||
// appending the filter to get the next set of data
|
||||
if params.CompositeQuery.BuilderQueries[qName].DataSource == v3.DataSourceLogs {
|
||||
params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data))
|
||||
queries, err := q.builder.PrepareQueries(params)
|
||||
if err != nil {
|
||||
errs := []error{err}
|
||||
errQuriesByName := map[string]error{
|
||||
name: err,
|
||||
}
|
||||
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
|
||||
return nil, nil, err
|
||||
}
|
||||
for name, query := range queries {
|
||||
rowList, err := q.reader.GetListResultV3(ctx, query)
|
||||
if err != nil {
|
||||
errs := []error{err}
|
||||
errQueriesByName := map[string]error{
|
||||
name: err,
|
||||
}
|
||||
return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
|
||||
}
|
||||
length += uint64(len(rowList))
|
||||
data = append(data, rowList...)
|
||||
}
|
||||
data = append(data, rowList...)
|
||||
}
|
||||
|
||||
// append a filter to the params
|
||||
if len(data) > 0 {
|
||||
params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "id",
|
||||
IsColumn: true,
|
||||
DataType: "string",
|
||||
},
|
||||
Operator: v3.FilterOperatorLessThan,
|
||||
Value: data[len(data)-1].Data["id"],
|
||||
})
|
||||
}
|
||||
if length > 0 {
|
||||
params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "id",
|
||||
IsColumn: true,
|
||||
DataType: "string",
|
||||
},
|
||||
Operator: v3.FilterOperatorLessThan,
|
||||
Value: data[len(data)-1].Data["id"],
|
||||
})
|
||||
}
|
||||
|
||||
if uint64(len(data)) >= pageSize {
|
||||
break
|
||||
if uint64(len(data)) >= pageSize {
|
||||
break
|
||||
}
|
||||
} else {
|
||||
// TRACE
|
||||
// we are updating the offset and limit based on the number of traces we have found in the current timerange
|
||||
// eg -
|
||||
// 1)offset = 0, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
|
||||
//
|
||||
// if 100 traces are there in [t1, t10] then 100 will return immediately.
|
||||
// if 10 traces are there in [t1, t10] then we get 10, set offset to 0 and limit to 90, search in the next timerange of [t10, 20]
|
||||
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with offset=0, limit=100
|
||||
|
||||
//
|
||||
// 2) offset = 50, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
|
||||
//
|
||||
// If we find 150 traces with limit=150 and offset=0 in [t1, t10] then we return immediately 100 traces
|
||||
// If we find 50 in [t1, t10] with limit=150 and offset=0 then it will set limit = 100 and offset=0 and search in the next timerange of [t10, 20]
|
||||
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with limit=150 and offset=0
|
||||
|
||||
// max limit + offset is 10k for pagination
|
||||
if tracesLimit > constants.TRACE_V4_MAX_PAGINATION_LIMIT {
|
||||
return nil, nil, fmt.Errorf("maximum traces that can be paginated is 10000")
|
||||
}
|
||||
|
||||
params.CompositeQuery.BuilderQueries[qName].Offset = 0
|
||||
params.CompositeQuery.BuilderQueries[qName].Limit = tracesLimit
|
||||
queries, err := q.builder.PrepareQueries(params)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
for name, query := range queries {
|
||||
rowList, err := q.reader.GetListResultV3(ctx, query)
|
||||
if err != nil {
|
||||
errs := []error{err}
|
||||
errQueriesByName := map[string]error{
|
||||
name: err,
|
||||
}
|
||||
return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
|
||||
}
|
||||
length += uint64(len(rowList))
|
||||
|
||||
// skip the traces unless offset is 0
|
||||
for _, row := range rowList {
|
||||
if offset == 0 {
|
||||
data = append(data, row)
|
||||
} else {
|
||||
offset--
|
||||
}
|
||||
}
|
||||
}
|
||||
tracesLimit = tracesLimit - length
|
||||
|
||||
if uint64(len(data)) >= limit {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
res = append(res, &v3.Result{
|
||||
@ -368,15 +435,25 @@ func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangePar
|
||||
}
|
||||
|
||||
func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) {
|
||||
// List query has support for only one query.
|
||||
if q.UseLogsNewSchema && params.CompositeQuery != nil && len(params.CompositeQuery.BuilderQueries) == 1 {
|
||||
// List query has support for only one query
|
||||
// we are skipping for PanelTypeTrace as it has a custom order by regardless of what's in the payload
|
||||
if params.CompositeQuery != nil &&
|
||||
len(params.CompositeQuery.BuilderQueries) == 1 &&
|
||||
params.CompositeQuery.PanelType != v3.PanelTypeTrace {
|
||||
for _, v := range params.CompositeQuery.BuilderQueries {
|
||||
if (v.DataSource == v3.DataSourceLogs && !q.UseLogsNewSchema) ||
|
||||
(v.DataSource == v3.DataSourceTraces && !q.UseTraceNewSchema) {
|
||||
break
|
||||
}
|
||||
|
||||
// only allow of logs queries with timestamp ordering desc
|
||||
if v.DataSource == v3.DataSourceLogs && len(v.OrderBy) == 1 && v.OrderBy[0].ColumnName == "timestamp" && v.OrderBy[0].Order == "desc" {
|
||||
startEndArr := utils.GetLogsListTsRanges(params.Start, params.End)
|
||||
if len(startEndArr) > 0 {
|
||||
return q.runLogsListQuery(ctx, params, startEndArr)
|
||||
}
|
||||
// TODO(nitya): allow for timestamp asc
|
||||
if (v.DataSource == v3.DataSourceLogs || v.DataSource == v3.DataSourceTraces) &&
|
||||
len(v.OrderBy) == 1 &&
|
||||
v.OrderBy[0].ColumnName == "timestamp" &&
|
||||
v.OrderBy[0].Order == "desc" {
|
||||
startEndArr := utils.GetListTsRanges(params.Start, params.End)
|
||||
return q.runWindowBasedListQuery(ctx, params, startEndArr)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -408,13 +485,13 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan
|
||||
close(ch)
|
||||
|
||||
var errs []error
|
||||
errQuriesByName := make(map[string]error)
|
||||
errQueriesByName := make(map[string]error)
|
||||
res := make([]*v3.Result, 0)
|
||||
// read values from the channel
|
||||
for r := range ch {
|
||||
if r.Err != nil {
|
||||
errs = append(errs, r.Err)
|
||||
errQuriesByName[r.Name] = r.Err
|
||||
errQueriesByName[r.Name] = r.Err
|
||||
continue
|
||||
}
|
||||
res = append(res, &v3.Result{
|
||||
@ -423,7 +500,7 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan
|
||||
})
|
||||
}
|
||||
if len(errs) != 0 {
|
||||
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
|
||||
return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
|
||||
}
|
||||
return res, nil, nil
|
||||
}
|
||||
|
@ -5,15 +5,21 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"regexp"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
|
||||
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/cache/inmemory"
|
||||
"go.signoz.io/signoz/pkg/query-service/featureManager"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/querycache"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||
)
|
||||
|
||||
func minTimestamp(series []*v3.Series) int64 {
|
||||
@ -1124,3 +1130,304 @@ func TestQueryRangeValueTypePromQL(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type regexMatcher struct {
|
||||
}
|
||||
|
||||
func (m *regexMatcher) Match(expectedSQL, actualSQL string) error {
|
||||
re, err := regexp.Compile(expectedSQL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !re.MatchString(actualSQL) {
|
||||
return fmt.Errorf("expected query to contain %s, got %s", expectedSQL, actualSQL)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
||||
params := &v3.QueryRangeParamsV3{
|
||||
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
||||
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
PanelType: v3.PanelTypeList,
|
||||
BuilderQueries: map[string]*v3.BuilderQuery{
|
||||
"A": {
|
||||
QueryName: "A",
|
||||
Expression: "A",
|
||||
DataSource: v3.DataSourceTraces,
|
||||
PageSize: 10,
|
||||
Limit: 100,
|
||||
StepInterval: 60,
|
||||
AggregateOperator: v3.AggregateOperatorNoOp,
|
||||
SelectColumns: []v3.AttributeKey{{Key: "serviceName"}},
|
||||
Filters: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
tsRanges := []utils.LogsListTsRange{
|
||||
{
|
||||
Start: 1722259200000000000, // July 29, 2024 6:50:00 PM
|
||||
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||
},
|
||||
{
|
||||
Start: 1722252000000000000, // July 29, 2024 4:50:00 PM
|
||||
End: 1722259200000000000, // July 29, 2024 6:50:00 PM
|
||||
},
|
||||
{
|
||||
Start: 1722237600000000000, // July 29, 2024 12:50:00 PM
|
||||
End: 1722252000000000000, // July 29, 2024 4:50:00 PM
|
||||
},
|
||||
{
|
||||
Start: 1722208800000000000, // July 29, 2024 4:50:00 AM
|
||||
End: 1722237600000000000, // July 29, 2024 12:50:00 PM
|
||||
},
|
||||
{
|
||||
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
||||
End: 1722208800000000000, // July 29, 2024 4:50:00 AM
|
||||
},
|
||||
}
|
||||
|
||||
type queryParams struct {
|
||||
start int64
|
||||
end int64
|
||||
limit uint64
|
||||
offset uint64
|
||||
}
|
||||
|
||||
type queryResponse struct {
|
||||
expectedQuery string
|
||||
timestamps []uint64
|
||||
}
|
||||
|
||||
// create test struct with moc data i.e array of timestamps, limit, offset and expected results
|
||||
testCases := []struct {
|
||||
name string
|
||||
queryResponses []queryResponse
|
||||
queryParams queryParams
|
||||
expectedTimestamps []int64
|
||||
expectedError bool
|
||||
}{
|
||||
{
|
||||
name: "should return correct timestamps when querying within time window",
|
||||
queryResponses: []queryResponse{
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 2",
|
||||
timestamps: []uint64{1722259300000000000, 1722259400000000000},
|
||||
},
|
||||
},
|
||||
queryParams: queryParams{
|
||||
start: 1722171576000000000,
|
||||
end: 1722262800000000000,
|
||||
limit: 2,
|
||||
offset: 0,
|
||||
},
|
||||
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000},
|
||||
},
|
||||
{
|
||||
name: "all data not in first windows",
|
||||
queryResponses: []queryResponse{
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 3",
|
||||
timestamps: []uint64{1722259300000000000, 1722259400000000000},
|
||||
},
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 1",
|
||||
timestamps: []uint64{1722253000000000000},
|
||||
},
|
||||
},
|
||||
queryParams: queryParams{
|
||||
start: 1722171576000000000,
|
||||
end: 1722262800000000000,
|
||||
limit: 3,
|
||||
offset: 0,
|
||||
},
|
||||
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000},
|
||||
},
|
||||
{
|
||||
name: "data in multiple windows",
|
||||
queryResponses: []queryResponse{
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 5",
|
||||
timestamps: []uint64{1722259300000000000, 1722259400000000000},
|
||||
},
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 3",
|
||||
timestamps: []uint64{1722253000000000000},
|
||||
},
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 2",
|
||||
timestamps: []uint64{1722237700000000000},
|
||||
},
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* DESC LIMIT 1",
|
||||
timestamps: []uint64{},
|
||||
},
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722171576000000000' AND timestamp <= '1722208800000000000').* DESC LIMIT 1",
|
||||
timestamps: []uint64{},
|
||||
},
|
||||
},
|
||||
queryParams: queryParams{
|
||||
start: 1722171576000000000,
|
||||
end: 1722262800000000000,
|
||||
limit: 5,
|
||||
offset: 0,
|
||||
},
|
||||
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000, 1722237700000000000},
|
||||
},
|
||||
{
|
||||
name: "query with offset",
|
||||
queryResponses: []queryResponse{
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 7",
|
||||
timestamps: []uint64{1722259210000000000, 1722259220000000000, 1722259230000000000},
|
||||
},
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 4",
|
||||
timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000},
|
||||
},
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 1",
|
||||
timestamps: []uint64{1722237700000000000},
|
||||
},
|
||||
},
|
||||
queryParams: queryParams{
|
||||
start: 1722171576000000000,
|
||||
end: 1722262800000000000,
|
||||
limit: 4,
|
||||
offset: 3,
|
||||
},
|
||||
expectedTimestamps: []int64{1722253000000000000, 1722254000000000000, 1722255000000000000, 1722237700000000000},
|
||||
},
|
||||
{
|
||||
name: "query with offset and limit- data spread across multiple windows",
|
||||
queryResponses: []queryResponse{
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 11",
|
||||
timestamps: []uint64{},
|
||||
},
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 11",
|
||||
timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000},
|
||||
},
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 8",
|
||||
timestamps: []uint64{1722237700000000000, 1722237800000000000, 1722237900000000000, 1722237910000000000, 1722237920000000000},
|
||||
},
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* DESC LIMIT 3",
|
||||
timestamps: []uint64{1722208810000000000, 1722208820000000000, 1722208830000000000},
|
||||
},
|
||||
},
|
||||
queryParams: queryParams{
|
||||
start: 1722171576000000000,
|
||||
end: 1722262800000000000,
|
||||
limit: 5,
|
||||
offset: 6,
|
||||
},
|
||||
expectedTimestamps: []int64{1722237910000000000, 1722237920000000000, 1722208810000000000, 1722208820000000000, 1722208830000000000},
|
||||
},
|
||||
{
|
||||
name: "don't allow pagination to get more than 10k spans",
|
||||
queryResponses: []queryResponse{},
|
||||
queryParams: queryParams{
|
||||
start: 1722171576000000000,
|
||||
end: 1722262800000000000,
|
||||
limit: 10,
|
||||
offset: 9991,
|
||||
},
|
||||
expectedError: true,
|
||||
},
|
||||
}
|
||||
|
||||
cols := []cmock.ColumnType{
|
||||
{Name: "timestamp", Type: "UInt64"},
|
||||
{Name: "name", Type: "String"},
|
||||
}
|
||||
testName := "name"
|
||||
|
||||
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
|
||||
|
||||
// iterate over test data, create reader and run test
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
// Setup mock
|
||||
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, ®exMatcher{})
|
||||
require.NoError(t, err, "Failed to create ClickHouse mock")
|
||||
|
||||
// Configure mock responses
|
||||
for _, response := range tc.queryResponses {
|
||||
values := make([][]any, 0, len(response.timestamps))
|
||||
for _, ts := range response.timestamps {
|
||||
values = append(values, []any{&ts, &testName})
|
||||
}
|
||||
// if len(values) > 0 {
|
||||
mock.ExpectQuery(response.expectedQuery).WillReturnRows(
|
||||
cmock.NewRows(cols, values),
|
||||
)
|
||||
// }
|
||||
}
|
||||
|
||||
// Create reader and querier
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(
|
||||
mock,
|
||||
options,
|
||||
nil,
|
||||
"",
|
||||
featureManager.StartManager(),
|
||||
"",
|
||||
true,
|
||||
)
|
||||
|
||||
q := &querier{
|
||||
reader: reader,
|
||||
builder: queryBuilder.NewQueryBuilder(
|
||||
queryBuilder.QueryBuilderOptions{
|
||||
BuildTraceQuery: tracesV3.PrepareTracesQuery,
|
||||
},
|
||||
featureManager.StartManager(),
|
||||
),
|
||||
}
|
||||
// Update query parameters
|
||||
params.Start = tc.queryParams.start
|
||||
params.End = tc.queryParams.end
|
||||
params.CompositeQuery.BuilderQueries["A"].Limit = tc.queryParams.limit
|
||||
params.CompositeQuery.BuilderQueries["A"].Offset = tc.queryParams.offset
|
||||
|
||||
// Execute query
|
||||
results, errMap, err := q.runWindowBasedListQuery(context.Background(), params, tsRanges)
|
||||
|
||||
if tc.expectedError {
|
||||
require.Error(t, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Assertions
|
||||
require.NoError(t, err, "Query execution failed")
|
||||
require.Nil(t, errMap, "Unexpected error map in results")
|
||||
require.Len(t, results, 1, "Expected exactly one result set")
|
||||
|
||||
result := results[0]
|
||||
require.Equal(t, "A", result.QueryName, "Incorrect query name in results")
|
||||
require.Len(t, result.List, len(tc.expectedTimestamps),
|
||||
"Result count mismatch: got %d results, expected %d",
|
||||
len(result.List), len(tc.expectedTimestamps))
|
||||
|
||||
for i, expected := range tc.expectedTimestamps {
|
||||
require.Equal(t, expected, result.List[i].Timestamp.UnixNano(),
|
||||
"Timestamp mismatch at index %d: got %d, expected %d",
|
||||
i, result.List[i].Timestamp.UnixNano(), expected)
|
||||
}
|
||||
|
||||
// Verify mock expectations
|
||||
err = mock.ExpectationsWereMet()
|
||||
require.NoError(t, err, "Mock expectations were not met")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
|
||||
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/common"
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
chErrors "go.signoz.io/signoz/pkg/query-service/errors"
|
||||
"go.signoz.io/signoz/pkg/query-service/querycache"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||
@ -48,10 +49,11 @@ type querier struct {
|
||||
testingMode bool
|
||||
queriesExecuted []string
|
||||
// tuple of start and end time in milliseconds
|
||||
timeRanges [][]int
|
||||
returnedSeries []*v3.Series
|
||||
returnedErr error
|
||||
UseLogsNewSchema bool
|
||||
timeRanges [][]int
|
||||
returnedSeries []*v3.Series
|
||||
returnedErr error
|
||||
UseLogsNewSchema bool
|
||||
UseTraceNewSchema bool
|
||||
}
|
||||
|
||||
type QuerierOptions struct {
|
||||
@ -308,56 +310,121 @@ func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRang
|
||||
return results, errQueriesByName, err
|
||||
}
|
||||
|
||||
func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) {
|
||||
func (q *querier) runWindowBasedListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) {
|
||||
res := make([]*v3.Result, 0)
|
||||
qName := ""
|
||||
pageSize := uint64(0)
|
||||
limit := uint64(0)
|
||||
offset := uint64(0)
|
||||
|
||||
// se we are considering only one query
|
||||
for name, v := range params.CompositeQuery.BuilderQueries {
|
||||
qName = name
|
||||
pageSize = v.PageSize
|
||||
|
||||
// for traces specifically
|
||||
limit = v.Limit
|
||||
offset = v.Offset
|
||||
}
|
||||
data := []*v3.Row{}
|
||||
|
||||
tracesLimit := limit + offset
|
||||
|
||||
for _, v := range tsRanges {
|
||||
params.Start = v.Start
|
||||
params.End = v.End
|
||||
|
||||
params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data))
|
||||
queries, err := q.builder.PrepareQueries(params)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
length := uint64(0)
|
||||
// this will to run only once
|
||||
for name, query := range queries {
|
||||
rowList, err := q.reader.GetListResultV3(ctx, query)
|
||||
|
||||
// appending the filter to get the next set of data
|
||||
if params.CompositeQuery.BuilderQueries[qName].DataSource == v3.DataSourceLogs {
|
||||
params.CompositeQuery.BuilderQueries[qName].PageSize = pageSize - uint64(len(data))
|
||||
queries, err := q.builder.PrepareQueries(params)
|
||||
if err != nil {
|
||||
errs := []error{err}
|
||||
errQuriesByName := map[string]error{
|
||||
name: err,
|
||||
}
|
||||
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
|
||||
return nil, nil, err
|
||||
}
|
||||
for name, query := range queries {
|
||||
rowList, err := q.reader.GetListResultV3(ctx, query)
|
||||
if err != nil {
|
||||
errs := []error{err}
|
||||
errQueriesByName := map[string]error{
|
||||
name: err,
|
||||
}
|
||||
return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
|
||||
}
|
||||
length += uint64(len(rowList))
|
||||
data = append(data, rowList...)
|
||||
}
|
||||
data = append(data, rowList...)
|
||||
}
|
||||
|
||||
// append a filter to the params
|
||||
if len(data) > 0 {
|
||||
params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "id",
|
||||
IsColumn: true,
|
||||
DataType: "string",
|
||||
},
|
||||
Operator: v3.FilterOperatorLessThan,
|
||||
Value: data[len(data)-1].Data["id"],
|
||||
})
|
||||
}
|
||||
if length > 0 {
|
||||
params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "id",
|
||||
IsColumn: true,
|
||||
DataType: "string",
|
||||
},
|
||||
Operator: v3.FilterOperatorLessThan,
|
||||
Value: data[len(data)-1].Data["id"],
|
||||
})
|
||||
}
|
||||
|
||||
if uint64(len(data)) >= pageSize {
|
||||
break
|
||||
if uint64(len(data)) >= pageSize {
|
||||
break
|
||||
}
|
||||
} else {
|
||||
// TRACE
|
||||
// we are updating the offset and limit based on the number of traces we have found in the current timerange
|
||||
// eg -
|
||||
// 1)offset = 0, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
|
||||
//
|
||||
// if 100 traces are there in [t1, t10] then 100 will return immediately.
|
||||
// if 10 traces are there in [t1, t10] then we get 10, set offset to 0 and limit to 90, search in the next timerange of [t10, 20]
|
||||
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with offset=0, limit=100
|
||||
|
||||
//
|
||||
// 2) offset = 50, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30]
|
||||
//
|
||||
// If we find 150 traces with limit=150 and offset=0 in [t1, t10] then we return immediately 100 traces
|
||||
// If we find 50 in [t1, t10] with limit=150 and offset=0 then it will set limit = 100 and offset=0 and search in the next timerange of [t10, 20]
|
||||
// if we don't find any trace in [t1, t10], then we search in [t10, 20] with limit=150 and offset=0
|
||||
|
||||
// max limit + offset is 10k for pagination
|
||||
if tracesLimit > constants.TRACE_V4_MAX_PAGINATION_LIMIT {
|
||||
return nil, nil, fmt.Errorf("maximum traces that can be paginated is 10000")
|
||||
}
|
||||
|
||||
params.CompositeQuery.BuilderQueries[qName].Offset = 0
|
||||
params.CompositeQuery.BuilderQueries[qName].Limit = tracesLimit
|
||||
queries, err := q.builder.PrepareQueries(params)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
for name, query := range queries {
|
||||
rowList, err := q.reader.GetListResultV3(ctx, query)
|
||||
if err != nil {
|
||||
errs := []error{err}
|
||||
errQueriesByName := map[string]error{
|
||||
name: err,
|
||||
}
|
||||
return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
|
||||
}
|
||||
length += uint64(len(rowList))
|
||||
|
||||
// skip the traces unless offset is 0
|
||||
for _, row := range rowList {
|
||||
if offset == 0 {
|
||||
data = append(data, row)
|
||||
} else {
|
||||
offset--
|
||||
}
|
||||
}
|
||||
}
|
||||
tracesLimit = tracesLimit - length
|
||||
|
||||
if uint64(len(data)) >= limit {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
res = append(res, &v3.Result{
|
||||
@ -369,14 +436,24 @@ func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangePar
|
||||
|
||||
func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) {
|
||||
// List query has support for only one query.
|
||||
if q.UseLogsNewSchema && params.CompositeQuery != nil && len(params.CompositeQuery.BuilderQueries) == 1 {
|
||||
// we are skipping for PanelTypeTrace as it has a custom order by regardless of what's in the payload
|
||||
if params.CompositeQuery != nil &&
|
||||
len(params.CompositeQuery.BuilderQueries) == 1 &&
|
||||
params.CompositeQuery.PanelType != v3.PanelTypeTrace {
|
||||
for _, v := range params.CompositeQuery.BuilderQueries {
|
||||
if (v.DataSource == v3.DataSourceLogs && !q.UseLogsNewSchema) ||
|
||||
(v.DataSource == v3.DataSourceTraces && !q.UseTraceNewSchema) {
|
||||
break
|
||||
}
|
||||
|
||||
// only allow of logs queries with timestamp ordering desc
|
||||
if v.DataSource == v3.DataSourceLogs && len(v.OrderBy) == 1 && v.OrderBy[0].ColumnName == "timestamp" && v.OrderBy[0].Order == "desc" {
|
||||
startEndArr := utils.GetLogsListTsRanges(params.Start, params.End)
|
||||
if len(startEndArr) > 0 {
|
||||
return q.runLogsListQuery(ctx, params, startEndArr)
|
||||
}
|
||||
// TODO(nitya): allow for timestamp asc
|
||||
if (v.DataSource == v3.DataSourceLogs || v.DataSource == v3.DataSourceTraces) &&
|
||||
len(v.OrderBy) == 1 &&
|
||||
v.OrderBy[0].ColumnName == "timestamp" &&
|
||||
v.OrderBy[0].Order == "desc" {
|
||||
startEndArr := utils.GetListTsRanges(params.Start, params.End)
|
||||
return q.runWindowBasedListQuery(ctx, params, startEndArr)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -416,13 +493,13 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan
|
||||
close(ch)
|
||||
|
||||
var errs []error
|
||||
errQuriesByName := make(map[string]error)
|
||||
errQueriesByName := make(map[string]error)
|
||||
res := make([]*v3.Result, 0)
|
||||
// read values from the channel
|
||||
for r := range ch {
|
||||
if r.Err != nil {
|
||||
errs = append(errs, r.Err)
|
||||
errQuriesByName[r.Name] = r.Err
|
||||
errQueriesByName[r.Name] = r.Err
|
||||
continue
|
||||
}
|
||||
res = append(res, &v3.Result{
|
||||
@ -431,7 +508,7 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan
|
||||
})
|
||||
}
|
||||
if len(errs) != 0 {
|
||||
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
|
||||
return nil, errQueriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
|
||||
}
|
||||
return res, nil, nil
|
||||
}
|
||||
|
@ -5,15 +5,21 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"regexp"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
cmock "github.com/srikanthccv/ClickHouse-go-mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/clickhouseReader"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
|
||||
tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/cache/inmemory"
|
||||
"go.signoz.io/signoz/pkg/query-service/featureManager"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/querycache"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||
)
|
||||
|
||||
func minTimestamp(series []*v3.Series) int64 {
|
||||
@ -798,8 +804,8 @@ func TestV2QueryRangeValueType(t *testing.T) {
|
||||
}
|
||||
q := NewQuerier(opts)
|
||||
expectedTimeRangeInQueryString := []string{
|
||||
fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115520000, 1675115580000+120*60*1000), // 31st Jan, 03:23:00 to 31st Jan, 05:23:00
|
||||
fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115580000+120*60*1000, 1675115580000+180*60*1000), // 31st Jan, 05:23:00 to 31st Jan, 06:23:00
|
||||
fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115520000, 1675115580000+120*60*1000), // 31st Jan, 03:23:00 to 31st Jan, 05:23:00
|
||||
fmt.Sprintf("unix_milli >= %d AND unix_milli < %d", 1675115580000+120*60*1000, 1675115580000+180*60*1000), // 31st Jan, 05:23:00 to 31st Jan, 06:23:00
|
||||
fmt.Sprintf("timestamp >= '%d' AND timestamp <= '%d'", (1675119196722)*int64(1000000), (1675126396722)*int64(1000000)), // 31st Jan, 05:23:00 to 31st Jan, 06:23:00
|
||||
}
|
||||
|
||||
@ -1178,3 +1184,304 @@ func TestV2QueryRangeValueTypePromQL(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type regexMatcher struct {
|
||||
}
|
||||
|
||||
func (m *regexMatcher) Match(expectedSQL, actualSQL string) error {
|
||||
re, err := regexp.Compile(expectedSQL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !re.MatchString(actualSQL) {
|
||||
return fmt.Errorf("expected query to contain %s, got %s", expectedSQL, actualSQL)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Test_querier_runWindowBasedListQuery(t *testing.T) {
|
||||
params := &v3.QueryRangeParamsV3{
|
||||
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
||||
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||
CompositeQuery: &v3.CompositeQuery{
|
||||
PanelType: v3.PanelTypeList,
|
||||
BuilderQueries: map[string]*v3.BuilderQuery{
|
||||
"A": {
|
||||
QueryName: "A",
|
||||
Expression: "A",
|
||||
DataSource: v3.DataSourceTraces,
|
||||
PageSize: 10,
|
||||
Limit: 100,
|
||||
StepInterval: 60,
|
||||
AggregateOperator: v3.AggregateOperatorNoOp,
|
||||
SelectColumns: []v3.AttributeKey{{Key: "serviceName"}},
|
||||
Filters: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
tsRanges := []utils.LogsListTsRange{
|
||||
{
|
||||
Start: 1722259200000000000, // July 29, 2024 6:50:00 PM
|
||||
End: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||
},
|
||||
{
|
||||
Start: 1722252000000000000, // July 29, 2024 4:50:00 PM
|
||||
End: 1722259200000000000, // July 29, 2024 6:50:00 PM
|
||||
},
|
||||
{
|
||||
Start: 1722237600000000000, // July 29, 2024 12:50:00 PM
|
||||
End: 1722252000000000000, // July 29, 2024 4:50:00 PM
|
||||
},
|
||||
{
|
||||
Start: 1722208800000000000, // July 29, 2024 4:50:00 AM
|
||||
End: 1722237600000000000, // July 29, 2024 12:50:00 PM
|
||||
},
|
||||
{
|
||||
Start: 1722171576000000000, // July 28, 2024 6:29:36 PM
|
||||
End: 1722208800000000000, // July 29, 2024 4:50:00 AM
|
||||
},
|
||||
}
|
||||
|
||||
type queryParams struct {
|
||||
start int64
|
||||
end int64
|
||||
limit uint64
|
||||
offset uint64
|
||||
}
|
||||
|
||||
type queryResponse struct {
|
||||
expectedQuery string
|
||||
timestamps []uint64
|
||||
}
|
||||
|
||||
// create test struct with moc data i.e array of timestamps, limit, offset and expected results
|
||||
testCases := []struct {
|
||||
name string
|
||||
queryResponses []queryResponse
|
||||
queryParams queryParams
|
||||
expectedTimestamps []int64
|
||||
expectedError bool
|
||||
}{
|
||||
{
|
||||
name: "should return correct timestamps when querying within time window",
|
||||
queryResponses: []queryResponse{
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 2",
|
||||
timestamps: []uint64{1722259300000000000, 1722259400000000000},
|
||||
},
|
||||
},
|
||||
queryParams: queryParams{
|
||||
start: 1722171576000000000,
|
||||
end: 1722262800000000000,
|
||||
limit: 2,
|
||||
offset: 0,
|
||||
},
|
||||
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000},
|
||||
},
|
||||
{
|
||||
name: "all data not in first windows",
|
||||
queryResponses: []queryResponse{
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 3",
|
||||
timestamps: []uint64{1722259300000000000, 1722259400000000000},
|
||||
},
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 1",
|
||||
timestamps: []uint64{1722253000000000000},
|
||||
},
|
||||
},
|
||||
queryParams: queryParams{
|
||||
start: 1722171576000000000,
|
||||
end: 1722262800000000000,
|
||||
limit: 3,
|
||||
offset: 0,
|
||||
},
|
||||
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000},
|
||||
},
|
||||
{
|
||||
name: "data in multiple windows",
|
||||
queryResponses: []queryResponse{
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 5",
|
||||
timestamps: []uint64{1722259300000000000, 1722259400000000000},
|
||||
},
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 3",
|
||||
timestamps: []uint64{1722253000000000000},
|
||||
},
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 2",
|
||||
timestamps: []uint64{1722237700000000000},
|
||||
},
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* DESC LIMIT 1",
|
||||
timestamps: []uint64{},
|
||||
},
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722171576000000000' AND timestamp <= '1722208800000000000').* DESC LIMIT 1",
|
||||
timestamps: []uint64{},
|
||||
},
|
||||
},
|
||||
queryParams: queryParams{
|
||||
start: 1722171576000000000,
|
||||
end: 1722262800000000000,
|
||||
limit: 5,
|
||||
offset: 0,
|
||||
},
|
||||
expectedTimestamps: []int64{1722259300000000000, 1722259400000000000, 1722253000000000000, 1722237700000000000},
|
||||
},
|
||||
{
|
||||
name: "query with offset",
|
||||
queryResponses: []queryResponse{
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 7",
|
||||
timestamps: []uint64{1722259210000000000, 1722259220000000000, 1722259230000000000},
|
||||
},
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 4",
|
||||
timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000},
|
||||
},
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 1",
|
||||
timestamps: []uint64{1722237700000000000},
|
||||
},
|
||||
},
|
||||
queryParams: queryParams{
|
||||
start: 1722171576000000000,
|
||||
end: 1722262800000000000,
|
||||
limit: 4,
|
||||
offset: 3,
|
||||
},
|
||||
expectedTimestamps: []int64{1722253000000000000, 1722254000000000000, 1722255000000000000, 1722237700000000000},
|
||||
},
|
||||
{
|
||||
name: "query with offset and limit- data spread across multiple windows",
|
||||
queryResponses: []queryResponse{
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722259200000000000' AND timestamp <= '1722262800000000000').* DESC LIMIT 11",
|
||||
timestamps: []uint64{},
|
||||
},
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722252000000000000' AND timestamp <= '1722259200000000000').* DESC LIMIT 11",
|
||||
timestamps: []uint64{1722253000000000000, 1722254000000000000, 1722255000000000000},
|
||||
},
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722237600000000000' AND timestamp <= '1722252000000000000').* DESC LIMIT 8",
|
||||
timestamps: []uint64{1722237700000000000, 1722237800000000000, 1722237900000000000, 1722237910000000000, 1722237920000000000},
|
||||
},
|
||||
{
|
||||
expectedQuery: ".*(timestamp >= '1722208800000000000' AND timestamp <= '1722237600000000000').* DESC LIMIT 3",
|
||||
timestamps: []uint64{1722208810000000000, 1722208820000000000, 1722208830000000000},
|
||||
},
|
||||
},
|
||||
queryParams: queryParams{
|
||||
start: 1722171576000000000,
|
||||
end: 1722262800000000000,
|
||||
limit: 5,
|
||||
offset: 6,
|
||||
},
|
||||
expectedTimestamps: []int64{1722237910000000000, 1722237920000000000, 1722208810000000000, 1722208820000000000, 1722208830000000000},
|
||||
},
|
||||
{
|
||||
name: "don't allow pagination to get more than 10k spans",
|
||||
queryResponses: []queryResponse{},
|
||||
queryParams: queryParams{
|
||||
start: 1722171576000000000,
|
||||
end: 1722262800000000000,
|
||||
limit: 10,
|
||||
offset: 9991,
|
||||
},
|
||||
expectedError: true,
|
||||
},
|
||||
}
|
||||
|
||||
cols := []cmock.ColumnType{
|
||||
{Name: "timestamp", Type: "UInt64"},
|
||||
{Name: "name", Type: "String"},
|
||||
}
|
||||
testName := "name"
|
||||
|
||||
options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace")
|
||||
|
||||
// iterate over test data, create reader and run test
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
// Setup mock
|
||||
mock, err := cmock.NewClickHouseWithQueryMatcher(nil, ®exMatcher{})
|
||||
require.NoError(t, err, "Failed to create ClickHouse mock")
|
||||
|
||||
// Configure mock responses
|
||||
for _, response := range tc.queryResponses {
|
||||
values := make([][]any, 0, len(response.timestamps))
|
||||
for _, ts := range response.timestamps {
|
||||
values = append(values, []any{&ts, &testName})
|
||||
}
|
||||
// if len(values) > 0 {
|
||||
mock.ExpectQuery(response.expectedQuery).WillReturnRows(
|
||||
cmock.NewRows(cols, values),
|
||||
)
|
||||
// }
|
||||
}
|
||||
|
||||
// Create reader and querier
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(
|
||||
mock,
|
||||
options,
|
||||
nil,
|
||||
"",
|
||||
featureManager.StartManager(),
|
||||
"",
|
||||
true,
|
||||
)
|
||||
|
||||
q := &querier{
|
||||
reader: reader,
|
||||
builder: queryBuilder.NewQueryBuilder(
|
||||
queryBuilder.QueryBuilderOptions{
|
||||
BuildTraceQuery: tracesV3.PrepareTracesQuery,
|
||||
},
|
||||
featureManager.StartManager(),
|
||||
),
|
||||
}
|
||||
// Update query parameters
|
||||
params.Start = tc.queryParams.start
|
||||
params.End = tc.queryParams.end
|
||||
params.CompositeQuery.BuilderQueries["A"].Limit = tc.queryParams.limit
|
||||
params.CompositeQuery.BuilderQueries["A"].Offset = tc.queryParams.offset
|
||||
|
||||
// Execute query
|
||||
results, errMap, err := q.runWindowBasedListQuery(context.Background(), params, tsRanges)
|
||||
|
||||
if tc.expectedError {
|
||||
require.Error(t, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Assertions
|
||||
require.NoError(t, err, "Query execution failed")
|
||||
require.Nil(t, errMap, "Unexpected error map in results")
|
||||
require.Len(t, results, 1, "Expected exactly one result set")
|
||||
|
||||
result := results[0]
|
||||
require.Equal(t, "A", result.QueryName, "Incorrect query name in results")
|
||||
require.Len(t, result.List, len(tc.expectedTimestamps),
|
||||
"Result count mismatch: got %d results, expected %d",
|
||||
len(result.List), len(tc.expectedTimestamps))
|
||||
|
||||
for i, expected := range tc.expectedTimestamps {
|
||||
require.Equal(t, expected, result.List[i].Timestamp.UnixNano(),
|
||||
"Timestamp mismatch at index %d: got %d, expected %d",
|
||||
i, result.List[i].Timestamp.UnixNano(), expected)
|
||||
}
|
||||
|
||||
// Verify mock expectations
|
||||
err = mock.ExpectationsWereMet()
|
||||
require.NoError(t, err, "Mock expectations were not met")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -384,6 +384,11 @@ func LogCommentEnricher(next http.Handler) http.Handler {
|
||||
client = "api"
|
||||
}
|
||||
|
||||
email, err := auth.GetEmailFromJwt(r.Context())
|
||||
if err != nil {
|
||||
zap.S().Errorf("error while getting email from jwt: %v", err)
|
||||
}
|
||||
|
||||
kvs := map[string]string{
|
||||
"path": path,
|
||||
"dashboardID": dashboardID,
|
||||
@ -392,6 +397,7 @@ func LogCommentEnricher(next http.Handler) http.Handler {
|
||||
"client": client,
|
||||
"viewName": viewName,
|
||||
"servicesTab": tab,
|
||||
"email": email,
|
||||
}
|
||||
|
||||
r = r.WithContext(context.WithValue(r.Context(), common.LogCommentKey, kvs))
|
||||
|
118
pkg/query-service/app/traces/v4/enrich.go
Normal file
118
pkg/query-service/app/traces/v4/enrich.go
Normal file
@ -0,0 +1,118 @@
|
||||
package v4
|
||||
|
||||
import (
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||
)
|
||||
|
||||
// if the field is timestamp/id/value we don't need to enrich
|
||||
// if the field is static we don't need to enrich
|
||||
// for all others we need to enrich
|
||||
// an attribute/resource can be materialized/dematerialized
|
||||
// but the query should work regardless and shouldn't fail
|
||||
func isEnriched(field v3.AttributeKey) bool {
|
||||
// if it is timestamp/id dont check
|
||||
if field.Key == "timestamp" || field.Key == constants.SigNozOrderByValue {
|
||||
return true
|
||||
}
|
||||
|
||||
// we need to check if the field is static and return false if isColumn is not set
|
||||
if _, ok := constants.StaticFieldsTraces[field.Key]; ok && field.IsColumn {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func enrichKeyWithMetadata(key v3.AttributeKey, keys map[string]v3.AttributeKey) v3.AttributeKey {
|
||||
if isEnriched(key) {
|
||||
return key
|
||||
}
|
||||
|
||||
if v, ok := constants.StaticFieldsTraces[key.Key]; ok {
|
||||
return v
|
||||
}
|
||||
|
||||
for _, key := range utils.GenerateEnrichmentKeys(key) {
|
||||
if val, ok := keys[key]; ok {
|
||||
return val
|
||||
}
|
||||
}
|
||||
|
||||
// enrich with default values if metadata is not found
|
||||
if key.Type == "" {
|
||||
key.Type = v3.AttributeKeyTypeTag
|
||||
}
|
||||
if key.DataType == "" {
|
||||
key.DataType = v3.AttributeKeyDataTypeString
|
||||
}
|
||||
return key
|
||||
}
|
||||
|
||||
func Enrich(params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) {
|
||||
if params.CompositeQuery.QueryType != v3.QueryTypeBuilder {
|
||||
return
|
||||
}
|
||||
|
||||
for _, query := range params.CompositeQuery.BuilderQueries {
|
||||
if query.DataSource == v3.DataSourceTraces {
|
||||
EnrichTracesQuery(query, keys)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func EnrichTracesQuery(query *v3.BuilderQuery, keys map[string]v3.AttributeKey) {
|
||||
// enrich aggregate attribute
|
||||
query.AggregateAttribute = enrichKeyWithMetadata(query.AggregateAttribute, keys)
|
||||
|
||||
// enrich filter items
|
||||
if query.Filters != nil && len(query.Filters.Items) > 0 {
|
||||
for idx, filter := range query.Filters.Items {
|
||||
query.Filters.Items[idx].Key = enrichKeyWithMetadata(filter.Key, keys)
|
||||
// if the serviceName column is used, use the corresponding resource attribute as well during filtering
|
||||
// since there is only one of these resource attributes we are adding it here directly.
|
||||
// move it somewhere else if this list is big
|
||||
if filter.Key.Key == "serviceName" {
|
||||
query.Filters.Items[idx].Key = v3.AttributeKey{
|
||||
Key: "service.name",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeResource,
|
||||
IsColumn: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// enrich group by
|
||||
for idx, groupBy := range query.GroupBy {
|
||||
query.GroupBy[idx] = enrichKeyWithMetadata(groupBy, keys)
|
||||
}
|
||||
|
||||
// enrich order by
|
||||
query.OrderBy = enrichOrderBy(query.OrderBy, keys)
|
||||
|
||||
// enrich select columns
|
||||
for idx, selectColumn := range query.SelectColumns {
|
||||
query.SelectColumns[idx] = enrichKeyWithMetadata(selectColumn, keys)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func enrichOrderBy(items []v3.OrderBy, keys map[string]v3.AttributeKey) []v3.OrderBy {
|
||||
enrichedItems := []v3.OrderBy{}
|
||||
for i := 0; i < len(items); i++ {
|
||||
attributeKey := enrichKeyWithMetadata(v3.AttributeKey{
|
||||
Key: items[i].ColumnName,
|
||||
}, keys)
|
||||
enrichedItems = append(enrichedItems, v3.OrderBy{
|
||||
ColumnName: items[i].ColumnName,
|
||||
Order: items[i].Order,
|
||||
Key: attributeKey.Key,
|
||||
DataType: attributeKey.DataType,
|
||||
Type: attributeKey.Type,
|
||||
IsColumn: attributeKey.IsColumn,
|
||||
})
|
||||
}
|
||||
return enrichedItems
|
||||
}
|
196
pkg/query-service/app/traces/v4/enrich_test.go
Normal file
196
pkg/query-service/app/traces/v4/enrich_test.go
Normal file
@ -0,0 +1,196 @@
|
||||
package v4
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
)
|
||||
|
||||
func TestEnrichTracesQuery(t *testing.T) {
|
||||
type args struct {
|
||||
query *v3.BuilderQuery
|
||||
keys map[string]v3.AttributeKey
|
||||
want *v3.BuilderQuery
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
}{
|
||||
{
|
||||
name: "test 1",
|
||||
args: args{
|
||||
query: &v3.BuilderQuery{
|
||||
Filters: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "bytes", Type: v3.AttributeKeyTypeTag}, Value: 100, Operator: ">"},
|
||||
},
|
||||
},
|
||||
OrderBy: []v3.OrderBy{},
|
||||
},
|
||||
keys: map[string]v3.AttributeKey{
|
||||
"bytes##tag##int64": {Key: "bytes", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeTag},
|
||||
},
|
||||
want: &v3.BuilderQuery{
|
||||
Filters: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "bytes", Type: v3.AttributeKeyTypeTag, DataType: v3.AttributeKeyDataTypeInt64}, Value: 100, Operator: ">"},
|
||||
},
|
||||
},
|
||||
OrderBy: []v3.OrderBy{},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "test service name",
|
||||
args: args{
|
||||
query: &v3.BuilderQuery{
|
||||
Filters: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "serviceName", DataType: v3.AttributeKeyDataTypeString, IsColumn: true}, Value: "myservice", Operator: "="},
|
||||
{Key: v3.AttributeKey{Key: "serviceName"}, Value: "myservice", Operator: "="},
|
||||
},
|
||||
},
|
||||
OrderBy: []v3.OrderBy{},
|
||||
},
|
||||
keys: map[string]v3.AttributeKey{},
|
||||
want: &v3.BuilderQuery{
|
||||
Filters: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "service.name", Type: v3.AttributeKeyTypeResource, DataType: v3.AttributeKeyDataTypeString}, Value: "myservice", Operator: "="},
|
||||
{Key: v3.AttributeKey{Key: "service.name", Type: v3.AttributeKeyTypeResource, DataType: v3.AttributeKeyDataTypeString}, Value: "myservice", Operator: "="},
|
||||
},
|
||||
},
|
||||
OrderBy: []v3.OrderBy{},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "test mat attrs",
|
||||
args: args{
|
||||
query: &v3.BuilderQuery{
|
||||
Filters: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "http.route", DataType: v3.AttributeKeyDataTypeString, IsColumn: true}, Value: "/api", Operator: "="},
|
||||
{Key: v3.AttributeKey{Key: "msgSystem"}, Value: "name", Operator: "="},
|
||||
{Key: v3.AttributeKey{Key: "external_http_url"}, Value: "name", Operator: "="},
|
||||
},
|
||||
},
|
||||
OrderBy: []v3.OrderBy{},
|
||||
},
|
||||
keys: map[string]v3.AttributeKey{},
|
||||
want: &v3.BuilderQuery{
|
||||
Filters: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "http.route", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Value: "/api", Operator: "="},
|
||||
{Key: v3.AttributeKey{Key: "msgSystem", DataType: v3.AttributeKeyDataTypeString, IsColumn: true}, Value: "name", Operator: "="},
|
||||
{Key: v3.AttributeKey{Key: "external_http_url", DataType: v3.AttributeKeyDataTypeString, IsColumn: true}, Value: "name", Operator: "="},
|
||||
},
|
||||
},
|
||||
OrderBy: []v3.OrderBy{},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "test aggregateattr, filter, groupby, order by",
|
||||
args: args{
|
||||
query: &v3.BuilderQuery{
|
||||
AggregateOperator: v3.AggregateOperatorCount,
|
||||
AggregateAttribute: v3.AttributeKey{
|
||||
Key: "http.route",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeTag,
|
||||
},
|
||||
Filters: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "http.route", DataType: v3.AttributeKeyDataTypeString}, Value: "/api", Operator: "="},
|
||||
},
|
||||
},
|
||||
GroupBy: []v3.AttributeKey{
|
||||
{Key: "http.route", DataType: v3.AttributeKeyDataTypeString},
|
||||
{Key: "msgSystem", DataType: v3.AttributeKeyDataTypeString},
|
||||
},
|
||||
OrderBy: []v3.OrderBy{
|
||||
{ColumnName: "httpRoute", Order: v3.DirectionAsc},
|
||||
},
|
||||
},
|
||||
keys: map[string]v3.AttributeKey{
|
||||
"http.route##tag##string": {Key: "http.route", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true},
|
||||
},
|
||||
want: &v3.BuilderQuery{
|
||||
AggregateAttribute: v3.AttributeKey{
|
||||
Key: "http.route",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeTag,
|
||||
IsColumn: true,
|
||||
},
|
||||
Filters: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "http.route", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Value: "/api", Operator: "="},
|
||||
},
|
||||
},
|
||||
GroupBy: []v3.AttributeKey{
|
||||
{Key: "http.route", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true},
|
||||
{Key: "msgSystem", DataType: v3.AttributeKeyDataTypeString, IsJSON: false, IsColumn: true},
|
||||
},
|
||||
OrderBy: []v3.OrderBy{
|
||||
{Key: "httpRoute", Order: v3.DirectionAsc, ColumnName: "httpRoute", DataType: v3.AttributeKeyDataTypeString, IsColumn: true},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "enrich default values",
|
||||
args: args{
|
||||
query: &v3.BuilderQuery{
|
||||
Filters: &v3.FilterSet{
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "testattr"}},
|
||||
},
|
||||
},
|
||||
OrderBy: []v3.OrderBy{{ColumnName: "timestamp", Order: v3.DirectionAsc}},
|
||||
},
|
||||
keys: map[string]v3.AttributeKey{},
|
||||
want: &v3.BuilderQuery{
|
||||
Filters: &v3.FilterSet{
|
||||
Items: []v3.FilterItem{{Key: v3.AttributeKey{Key: "testattr", Type: v3.AttributeKeyTypeTag, DataType: v3.AttributeKeyDataTypeString}}},
|
||||
},
|
||||
// isColumn won't matter in timestamp as it will always be a column
|
||||
OrderBy: []v3.OrderBy{{Key: "timestamp", Order: v3.DirectionAsc, ColumnName: "timestamp"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
EnrichTracesQuery(tt.args.query, tt.args.keys)
|
||||
// Check AggregateAttribute
|
||||
if tt.args.query.AggregateAttribute.Key != "" && !reflect.DeepEqual(tt.args.query.AggregateAttribute, tt.args.want.AggregateAttribute) {
|
||||
t.Errorf("EnrichTracesQuery() AggregateAttribute = %v, want %v", tt.args.query.AggregateAttribute, tt.args.want.AggregateAttribute)
|
||||
}
|
||||
|
||||
// Check Filters
|
||||
if tt.args.query.Filters != nil && !reflect.DeepEqual(tt.args.query.Filters, tt.args.want.Filters) {
|
||||
t.Errorf("EnrichTracesQuery() Filters = %v, want %v", tt.args.query.Filters, tt.args.want.Filters)
|
||||
}
|
||||
|
||||
// Check GroupBy
|
||||
if tt.args.query.GroupBy != nil && !reflect.DeepEqual(tt.args.query.GroupBy, tt.args.want.GroupBy) {
|
||||
t.Errorf("EnrichTracesQuery() GroupBy = %v, want %v", tt.args.query.GroupBy, tt.args.want.GroupBy)
|
||||
}
|
||||
|
||||
// Check OrderBy
|
||||
if tt.args.query.OrderBy != nil && !reflect.DeepEqual(tt.args.query.OrderBy, tt.args.want.OrderBy) {
|
||||
t.Errorf("EnrichTracesQuery() OrderBy = %v, want %v", tt.args.query.OrderBy, tt.args.want.OrderBy)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -124,6 +124,27 @@ func Test_getColumnName(t *testing.T) {
|
||||
},
|
||||
want: "attributes_string['xyz']",
|
||||
},
|
||||
{
|
||||
name: "new composite column",
|
||||
args: args{
|
||||
key: v3.AttributeKey{Key: "response_status_code"},
|
||||
},
|
||||
want: "response_status_code",
|
||||
},
|
||||
{
|
||||
name: "new composite column with metadata",
|
||||
args: args{
|
||||
key: v3.AttributeKey{Key: "response_status_code", DataType: v3.AttributeKeyDataTypeString, IsColumn: true},
|
||||
},
|
||||
want: "response_status_code",
|
||||
},
|
||||
{
|
||||
name: "new normal column with metadata",
|
||||
args: args{
|
||||
key: v3.AttributeKey{Key: "http.route", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true},
|
||||
},
|
||||
want: "`attribute_string_http$$route`",
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
|
@ -589,4 +589,60 @@ var StaticFieldsTraces = map[string]v3.AttributeKey{
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
IsColumn: true,
|
||||
},
|
||||
|
||||
// new support
|
||||
"response_status_code": {
|
||||
Key: "response_status_code",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
IsColumn: true,
|
||||
},
|
||||
"external_http_url": {
|
||||
Key: "external_http_url",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
IsColumn: true,
|
||||
},
|
||||
"http_url": {
|
||||
Key: "http_url",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
IsColumn: true,
|
||||
},
|
||||
"external_http_method": {
|
||||
Key: "external_http_method",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
IsColumn: true,
|
||||
},
|
||||
"http_method": {
|
||||
Key: "http_method",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
IsColumn: true,
|
||||
},
|
||||
"http_host": {
|
||||
Key: "http_host",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
IsColumn: true,
|
||||
},
|
||||
"db_name": {
|
||||
Key: "db_name",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
IsColumn: true,
|
||||
},
|
||||
"db_operation": {
|
||||
Key: "db_operation",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
IsColumn: true,
|
||||
},
|
||||
"has_error": {
|
||||
Key: "has_error",
|
||||
DataType: v3.AttributeKeyDataTypeBool,
|
||||
IsColumn: true,
|
||||
},
|
||||
"is_remote": {
|
||||
Key: "is_remote",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
IsColumn: true,
|
||||
},
|
||||
// the simple attributes are not present here as
|
||||
// they are taken care by new format <attribute_type>_<attribute_datatype>_'<attribute_key>'
|
||||
}
|
||||
|
||||
const TRACE_V4_MAX_PAGINATION_LIMIT = 10000
|
||||
|
@ -183,7 +183,7 @@ func PrepareFilters(labels map[string]string, whereClauseItems []v3.FilterItem,
|
||||
var attrFound bool
|
||||
|
||||
// as of now this logic will only apply for logs
|
||||
for _, tKey := range utils.GenerateLogEnrichmentKeys(v3.AttributeKey{Key: key}) {
|
||||
for _, tKey := range utils.GenerateEnrichmentKeys(v3.AttributeKey{Key: key}) {
|
||||
if val, ok := keys[tKey]; ok {
|
||||
attributeKey = val
|
||||
attrFound = true
|
||||
|
@ -29,15 +29,10 @@ type Reader interface {
|
||||
// GetDisks returns a list of disks configured in the underlying DB. It is supported by
|
||||
// clickhouse only.
|
||||
GetDisks(ctx context.Context) (*[]model.DiskItem, *model.ApiError)
|
||||
GetSpanFilters(ctx context.Context, query *model.SpanFilterParams) (*model.SpanFiltersResponse, *model.ApiError)
|
||||
GetTraceAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error)
|
||||
GetTraceAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error)
|
||||
GetTraceAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error)
|
||||
GetSpanAttributeKeys(ctx context.Context) (map[string]v3.AttributeKey, error)
|
||||
GetTagFilters(ctx context.Context, query *model.TagFilterParams) (*model.TagFilters, *model.ApiError)
|
||||
GetTagValues(ctx context.Context, query *model.TagFilterParams) (*model.TagValues, *model.ApiError)
|
||||
GetFilteredSpans(ctx context.Context, query *model.GetFilteredSpansParams) (*model.GetFilterSpansResponse, *model.ApiError)
|
||||
GetFilteredSpansAggregates(ctx context.Context, query *model.GetFilteredSpanAggregatesParams) (*model.GetFilteredSpansAggregatesResponse, *model.ApiError)
|
||||
|
||||
ListErrors(ctx context.Context, params *model.ListErrorsParams) (*[]model.Error, *model.ApiError)
|
||||
CountErrors(ctx context.Context, params *model.CountErrorsParams) (uint64, *model.ApiError)
|
||||
|
@ -546,6 +546,9 @@ type SignozLogV2 struct {
|
||||
SeverityText string `json:"severity_text" ch:"severity_text"`
|
||||
SeverityNumber uint8 `json:"severity_number" ch:"severity_number"`
|
||||
Body string `json:"body" ch:"body"`
|
||||
ScopeName string `json:"scope_name" ch:"scope_name"`
|
||||
ScopeVersion string `json:"scope_version" ch:"scope_version"`
|
||||
ScopeString map[string]string `json:"scope_string" ch:"scope_string"`
|
||||
Resources_string map[string]string `json:"resources_string" ch:"resources_string"`
|
||||
Attributes_string map[string]string `json:"attributes_string" ch:"attributes_string"`
|
||||
Attributes_number map[string]float64 `json:"attributes_float" ch:"attributes_number"`
|
||||
|
@ -463,9 +463,9 @@ func (r *BaseRule) ShouldAlert(series v3.Series) (Sample, bool) {
|
||||
}
|
||||
} else if r.compareOp() == ValueOutsideBounds {
|
||||
for _, smpl := range series.Points {
|
||||
if math.Abs(smpl.Value) >= r.targetVal() {
|
||||
if math.Abs(smpl.Value) < r.targetVal() {
|
||||
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
|
||||
shouldAlert = true
|
||||
shouldAlert = false
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -347,6 +347,7 @@ func createTelemetry() {
|
||||
"alertsWithTSV2": alertsInfo.AlertsWithTSV2,
|
||||
"logsBasedAlerts": alertsInfo.LogsBasedAlerts,
|
||||
"metricBasedAlerts": alertsInfo.MetricBasedAlerts,
|
||||
"anomalyBasedAlerts": alertsInfo.AnomalyBasedAlerts,
|
||||
"tracesBasedAlerts": alertsInfo.TracesBasedAlerts,
|
||||
"totalChannels": alertsInfo.TotalChannels,
|
||||
"totalSavedViews": savedViewsInfo.TotalSavedViews,
|
||||
|
@ -9,7 +9,7 @@ type LogsListTsRange struct {
|
||||
End int64
|
||||
}
|
||||
|
||||
func GetLogsListTsRanges(start, end int64) []LogsListTsRange {
|
||||
func GetListTsRanges(start, end int64) []LogsListTsRange {
|
||||
startNano := GetEpochNanoSecs(start)
|
||||
endNano := GetEpochNanoSecs(end)
|
||||
result := []LogsListTsRange{}
|
||||
@ -35,13 +35,15 @@ func GetLogsListTsRanges(start, end int64) []LogsListTsRange {
|
||||
tStartNano = startNano
|
||||
}
|
||||
}
|
||||
} else {
|
||||
result = append(result, LogsListTsRange{Start: start, End: end})
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// This tries to see all possible fields that it can fall back to if some meta is missing
|
||||
// check Test_GenerateLogEnrichmentKeys for example
|
||||
func GenerateLogEnrichmentKeys(field v3.AttributeKey) []string {
|
||||
// check Test_GenerateEnrichmentKeys for example
|
||||
func GenerateEnrichmentKeys(field v3.AttributeKey) []string {
|
||||
names := []string{}
|
||||
if field.Type != v3.AttributeKeyTypeUnspecified && field.DataType != v3.AttributeKeyDataTypeUnspecified {
|
||||
names = append(names, field.Key+"##"+field.Type.String()+"##"+field.DataType.String())
|
||||
|
@ -7,7 +7,7 @@ import (
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
)
|
||||
|
||||
func TestLogsListTsRange(t *testing.T) {
|
||||
func TestListTsRange(t *testing.T) {
|
||||
startEndData := []struct {
|
||||
name string
|
||||
start int64
|
||||
@ -18,7 +18,7 @@ func TestLogsListTsRange(t *testing.T) {
|
||||
name: "testing for less then one hour",
|
||||
start: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||
end: 1722263800000000000, // July 29, 2024 8:06:40 PM
|
||||
res: []LogsListTsRange{},
|
||||
res: []LogsListTsRange{{1722262800000000000, 1722263800000000000}},
|
||||
},
|
||||
{
|
||||
name: "testing for more than one hour",
|
||||
@ -44,7 +44,7 @@ func TestLogsListTsRange(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, test := range startEndData {
|
||||
res := GetLogsListTsRanges(test.start, test.end)
|
||||
res := GetListTsRanges(test.start, test.end)
|
||||
for i, v := range res {
|
||||
if test.res[i].Start != v.Start || test.res[i].End != v.End {
|
||||
t.Errorf("expected range was %v - %v, got %v - %v", v.Start, v.End, test.res[i].Start, test.res[i].End)
|
||||
@ -53,7 +53,7 @@ func TestLogsListTsRange(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func Test_GenerateLogEnrichmentKeys(t *testing.T) {
|
||||
func Test_GenerateEnrichmentKeys(t *testing.T) {
|
||||
type args struct {
|
||||
field v3.AttributeKey
|
||||
}
|
||||
@ -96,8 +96,8 @@ func Test_GenerateLogEnrichmentKeys(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if got := GenerateLogEnrichmentKeys(tt.args.field); !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("generateLogEnrichmentKeys() = %v, want %v", got, tt.want)
|
||||
if got := GenerateEnrichmentKeys(tt.args.field); !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("GenerateEnrichmentKeys() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user