mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-05 11:00:40 +08:00
Merge branch 'develop' into release/v0.58.x
This commit is contained in:
commit
a9ced66258
@ -1,5 +1,5 @@
|
||||
# use a minimal alpine image
|
||||
FROM alpine:3.18.6
|
||||
FROM alpine:3.20.3
|
||||
|
||||
# Add Maintainer Info
|
||||
LABEL maintainer="signoz"
|
||||
|
@ -67,6 +67,30 @@ func StartManager(dbType string, db *sqlx.DB, useLicensesV3 bool, features ...ba
|
||||
repo: &repo,
|
||||
}
|
||||
|
||||
if useLicensesV3 {
|
||||
// get active license from the db
|
||||
active, err := m.repo.GetActiveLicense(context.Background())
|
||||
if err != nil {
|
||||
return m, err
|
||||
}
|
||||
|
||||
// if we have an active license then need to fetch the complete details
|
||||
if active != nil {
|
||||
// fetch the new license structure from control plane
|
||||
licenseV3, apiError := validate.ValidateLicenseV3(active.Key)
|
||||
if apiError != nil {
|
||||
return m, apiError
|
||||
}
|
||||
|
||||
// insert the licenseV3 in sqlite db
|
||||
apiError = m.repo.InsertLicenseV3(context.Background(), licenseV3)
|
||||
// if the license already exists move ahead.
|
||||
if apiError != nil && apiError.Typ != model.ErrorConflict {
|
||||
return m, apiError
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := m.start(useLicensesV3, features...); err != nil {
|
||||
return m, err
|
||||
}
|
||||
|
@ -61,6 +61,11 @@ func NewAnomalyRule(
|
||||
|
||||
zap.L().Info("creating new AnomalyRule", zap.String("id", id), zap.Any("opts", opts))
|
||||
|
||||
if p.RuleCondition.CompareOp == baserules.ValueIsBelow {
|
||||
target := -1 * *p.RuleCondition.Target
|
||||
p.RuleCondition.Target = &target
|
||||
}
|
||||
|
||||
baseRule, err := baserules.NewBaseRule(id, p, reader, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -8,7 +8,7 @@ import { ALERTS_DATA_SOURCE_MAP } from 'constants/alerts';
|
||||
import ROUTES from 'constants/routes';
|
||||
import useComponentPermission from 'hooks/useComponentPermission';
|
||||
import useFetch from 'hooks/useFetch';
|
||||
import { useCallback, useEffect, useState } from 'react';
|
||||
import { useCallback, useEffect, useRef, useState } from 'react';
|
||||
import { useTranslation } from 'react-i18next';
|
||||
import { useSelector } from 'react-redux';
|
||||
import { AppState } from 'store/reducers';
|
||||
@ -83,16 +83,22 @@ function BasicInfo({
|
||||
window.open(ROUTES.CHANNELS_NEW, '_blank');
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, []);
|
||||
const hasLoggedEvent = useRef(false);
|
||||
|
||||
useEffect(() => {
|
||||
if (!channels.loading && isNewRule) {
|
||||
if (!channels.loading && isNewRule && !hasLoggedEvent.current) {
|
||||
logEvent('Alert: New alert creation page visited', {
|
||||
dataSource: ALERTS_DATA_SOURCE_MAP[alertDef?.alertType as AlertTypes],
|
||||
numberOfChannels: channels?.payload?.length,
|
||||
});
|
||||
hasLoggedEvent.current = true;
|
||||
}
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [channels.payload, channels.loading]);
|
||||
}, [channels.loading]);
|
||||
|
||||
const refetchChannels = async (): Promise<void> => {
|
||||
await channels.refetch();
|
||||
};
|
||||
|
||||
return (
|
||||
<>
|
||||
@ -197,7 +203,7 @@ function BasicInfo({
|
||||
{!shouldBroadCastToAllChannels && (
|
||||
<Tooltip
|
||||
title={
|
||||
noChannels
|
||||
noChannels && !addNewChannelPermission
|
||||
? 'No channels. Ask an admin to create a notification channel'
|
||||
: undefined
|
||||
}
|
||||
@ -212,10 +218,10 @@ function BasicInfo({
|
||||
]}
|
||||
>
|
||||
<ChannelSelect
|
||||
disabled={
|
||||
shouldBroadCastToAllChannels || noChannels || !!channels.loading
|
||||
}
|
||||
onDropdownOpen={refetchChannels}
|
||||
disabled={shouldBroadCastToAllChannels}
|
||||
currentValue={alertDef.preferredChannels}
|
||||
handleCreateNewChannels={handleCreateNewChannels}
|
||||
channels={channels}
|
||||
onSelectChannels={(preferredChannels): void => {
|
||||
setAlertDef({
|
||||
|
@ -1,24 +1,33 @@
|
||||
import { Select } from 'antd';
|
||||
import { PlusOutlined } from '@ant-design/icons';
|
||||
import { Select, Spin } from 'antd';
|
||||
import useComponentPermission from 'hooks/useComponentPermission';
|
||||
import { State } from 'hooks/useFetch';
|
||||
import { useNotifications } from 'hooks/useNotifications';
|
||||
import { ReactNode } from 'react';
|
||||
import { useTranslation } from 'react-i18next';
|
||||
import { useSelector } from 'react-redux';
|
||||
import { AppState } from 'store/reducers';
|
||||
import { PayloadProps } from 'types/api/channels/getAll';
|
||||
import AppReducer from 'types/reducer/app';
|
||||
|
||||
import { StyledSelect } from './styles';
|
||||
import { StyledCreateChannelOption, StyledSelect } from './styles';
|
||||
|
||||
export interface ChannelSelectProps {
|
||||
disabled?: boolean;
|
||||
currentValue?: string[];
|
||||
onSelectChannels: (s: string[]) => void;
|
||||
onDropdownOpen: () => void;
|
||||
channels: State<PayloadProps | undefined>;
|
||||
handleCreateNewChannels: () => void;
|
||||
}
|
||||
|
||||
function ChannelSelect({
|
||||
disabled,
|
||||
currentValue,
|
||||
onSelectChannels,
|
||||
onDropdownOpen,
|
||||
channels,
|
||||
handleCreateNewChannels,
|
||||
}: ChannelSelectProps): JSX.Element | null {
|
||||
// init namespace for translations
|
||||
const { t } = useTranslation('alerts');
|
||||
@ -26,6 +35,10 @@ function ChannelSelect({
|
||||
const { notifications } = useNotifications();
|
||||
|
||||
const handleChange = (value: string[]): void => {
|
||||
if (value.includes('add-new-channel')) {
|
||||
handleCreateNewChannels();
|
||||
return;
|
||||
}
|
||||
onSelectChannels(value);
|
||||
};
|
||||
|
||||
@ -35,9 +48,27 @@ function ChannelSelect({
|
||||
description: channels.errorMessage,
|
||||
});
|
||||
}
|
||||
|
||||
const { role } = useSelector<AppState, AppReducer>((state) => state.app);
|
||||
const [addNewChannelPermission] = useComponentPermission(
|
||||
['add_new_channel'],
|
||||
role,
|
||||
);
|
||||
|
||||
const renderOptions = (): ReactNode[] => {
|
||||
const children: ReactNode[] = [];
|
||||
|
||||
if (!channels.loading && addNewChannelPermission) {
|
||||
children.push(
|
||||
<Select.Option key="add-new-channel" value="add-new-channel">
|
||||
<StyledCreateChannelOption>
|
||||
<PlusOutlined />
|
||||
Create a new channel
|
||||
</StyledCreateChannelOption>
|
||||
</Select.Option>,
|
||||
);
|
||||
}
|
||||
|
||||
if (
|
||||
channels.loading ||
|
||||
channels.payload === undefined ||
|
||||
@ -56,6 +87,7 @@ function ChannelSelect({
|
||||
|
||||
return children;
|
||||
};
|
||||
|
||||
return (
|
||||
<StyledSelect
|
||||
disabled={disabled}
|
||||
@ -65,6 +97,12 @@ function ChannelSelect({
|
||||
placeholder={t('placeholder_channel_select')}
|
||||
data-testid="alert-channel-select"
|
||||
value={currentValue}
|
||||
notFoundContent={channels.loading && <Spin size="small" />}
|
||||
onDropdownVisibleChange={(open): void => {
|
||||
if (open) {
|
||||
onDropdownOpen();
|
||||
}
|
||||
}}
|
||||
onChange={(value): void => {
|
||||
handleChange(value as string[]);
|
||||
}}
|
||||
|
@ -4,3 +4,10 @@ import styled from 'styled-components';
|
||||
export const StyledSelect = styled(Select)`
|
||||
border-radius: 4px;
|
||||
`;
|
||||
|
||||
export const StyledCreateChannelOption = styled.div`
|
||||
color: var(--bg-robin-500);
|
||||
display: flex;
|
||||
align-items: center;
|
||||
gap: 8px;
|
||||
`;
|
||||
|
@ -102,9 +102,9 @@ function RuleOptions({
|
||||
<Select.Option value="4">{t('option_notequal')}</Select.Option>
|
||||
</>
|
||||
)}
|
||||
|
||||
{/* the value 5 and 6 are reserved for above or equal and below or equal */}
|
||||
{ruleType === 'anomaly_rule' && (
|
||||
<Select.Option value="5">{t('option_above_below')}</Select.Option>
|
||||
<Select.Option value="7">{t('option_above_below')}</Select.Option>
|
||||
)}
|
||||
</InlineSelect>
|
||||
);
|
||||
|
@ -1,4 +1,4 @@
|
||||
import { useEffect, useRef, useState } from 'react';
|
||||
import { useCallback, useEffect, useState } from 'react';
|
||||
import { ErrorResponse, SuccessResponse } from 'types/api';
|
||||
|
||||
function useFetch<PayloadProps, FunctionParams>(
|
||||
@ -10,7 +10,7 @@ function useFetch<PayloadProps, FunctionParams>(
|
||||
(arg0: any): Promise<SuccessResponse<PayloadProps> | ErrorResponse>;
|
||||
},
|
||||
param?: FunctionParams,
|
||||
): State<PayloadProps | undefined> {
|
||||
): State<PayloadProps | undefined> & { refetch: () => Promise<void> } {
|
||||
const [state, setStates] = useState<State<PayloadProps | undefined>>({
|
||||
loading: true,
|
||||
success: null,
|
||||
@ -19,37 +19,28 @@ function useFetch<PayloadProps, FunctionParams>(
|
||||
payload: undefined,
|
||||
});
|
||||
|
||||
const loadingRef = useRef(0);
|
||||
|
||||
useEffect(() => {
|
||||
const fetchData = useCallback(async (): Promise<void> => {
|
||||
setStates((prev) => ({ ...prev, loading: true }));
|
||||
try {
|
||||
(async (): Promise<void> => {
|
||||
if (state.loading) {
|
||||
const response = await functions(param);
|
||||
const response = await functions(param);
|
||||
|
||||
if (loadingRef.current === 0) {
|
||||
loadingRef.current = 1;
|
||||
|
||||
if (response.statusCode === 200) {
|
||||
setStates({
|
||||
loading: false,
|
||||
error: false,
|
||||
success: true,
|
||||
payload: response.payload,
|
||||
errorMessage: '',
|
||||
});
|
||||
} else {
|
||||
setStates({
|
||||
loading: false,
|
||||
error: true,
|
||||
success: false,
|
||||
payload: undefined,
|
||||
errorMessage: response.error as string,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
})();
|
||||
if (response.statusCode === 200) {
|
||||
setStates({
|
||||
loading: false,
|
||||
error: false,
|
||||
success: true,
|
||||
payload: response.payload,
|
||||
errorMessage: '',
|
||||
});
|
||||
} else {
|
||||
setStates({
|
||||
loading: false,
|
||||
error: true,
|
||||
success: false,
|
||||
payload: undefined,
|
||||
errorMessage: response.error as string,
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
setStates({
|
||||
payload: undefined,
|
||||
@ -59,13 +50,16 @@ function useFetch<PayloadProps, FunctionParams>(
|
||||
errorMessage: error as string,
|
||||
});
|
||||
}
|
||||
return (): void => {
|
||||
loadingRef.current = 1;
|
||||
};
|
||||
}, [functions, param, state.loading]);
|
||||
}, [functions, param]);
|
||||
|
||||
// Initial fetch
|
||||
useEffect(() => {
|
||||
fetchData();
|
||||
}, [fetchData]);
|
||||
|
||||
return {
|
||||
...state,
|
||||
refetch: fetchData,
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -766,307 +766,6 @@ func buildFilterArrayQuery(_ context.Context, excludeMap map[string]struct{}, pa
|
||||
return args
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *model.SpanFilterParams) (*model.SpanFiltersResponse, *model.ApiError) {
|
||||
|
||||
var query string
|
||||
excludeMap := make(map[string]struct{})
|
||||
for _, e := range queryParams.Exclude {
|
||||
if e == constants.OperationRequest {
|
||||
excludeMap[constants.OperationDB] = struct{}{}
|
||||
continue
|
||||
}
|
||||
excludeMap[e] = struct{}{}
|
||||
}
|
||||
|
||||
args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))}
|
||||
if len(queryParams.TraceID) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.TraceID, constants.TraceID, &query, args)
|
||||
}
|
||||
if len(queryParams.ServiceName) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpRoute) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpHost) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpMethod) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpUrl) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args)
|
||||
}
|
||||
if len(queryParams.Operation) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args)
|
||||
}
|
||||
if len(queryParams.RPCMethod) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.RPCMethod, constants.RPCMethod, &query, args)
|
||||
}
|
||||
if len(queryParams.ResponseStatusCode) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ResponseStatusCode, constants.ResponseStatusCode, &query, args)
|
||||
}
|
||||
|
||||
if len(queryParams.MinDuration) != 0 {
|
||||
query = query + " AND durationNano >= @durationNanoMin"
|
||||
args = append(args, clickhouse.Named("durationNanoMin", queryParams.MinDuration))
|
||||
}
|
||||
if len(queryParams.MaxDuration) != 0 {
|
||||
query = query + " AND durationNano <= @durationNanoMax"
|
||||
args = append(args, clickhouse.Named("durationNanoMax", queryParams.MaxDuration))
|
||||
}
|
||||
|
||||
if len(queryParams.SpanKind) != 0 {
|
||||
query = query + " AND kind = @kind"
|
||||
args = append(args, clickhouse.Named("kind", queryParams.SpanKind))
|
||||
}
|
||||
|
||||
query = getStatusFilters(query, queryParams.Status, excludeMap)
|
||||
|
||||
traceFilterReponse := model.SpanFiltersResponse{
|
||||
Status: map[string]uint64{},
|
||||
Duration: map[string]uint64{},
|
||||
ServiceName: map[string]uint64{},
|
||||
Operation: map[string]uint64{},
|
||||
ResponseStatusCode: map[string]uint64{},
|
||||
RPCMethod: map[string]uint64{},
|
||||
HttpMethod: map[string]uint64{},
|
||||
HttpUrl: map[string]uint64{},
|
||||
HttpRoute: map[string]uint64{},
|
||||
HttpHost: map[string]uint64{},
|
||||
}
|
||||
|
||||
for _, e := range queryParams.GetFilters {
|
||||
switch e {
|
||||
case constants.TraceID:
|
||||
continue
|
||||
case constants.ServiceName:
|
||||
finalQuery := fmt.Sprintf("SELECT serviceName, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
||||
finalQuery += query
|
||||
finalQuery += " GROUP BY serviceName"
|
||||
var dBResponse []model.DBResponseServiceName
|
||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
||||
zap.L().Info(finalQuery)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
for _, service := range dBResponse {
|
||||
if service.ServiceName != "" {
|
||||
traceFilterReponse.ServiceName[service.ServiceName] = service.Count
|
||||
}
|
||||
}
|
||||
case constants.HttpRoute:
|
||||
finalQuery := fmt.Sprintf("SELECT httpRoute, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
||||
finalQuery += query
|
||||
finalQuery += " GROUP BY httpRoute"
|
||||
var dBResponse []model.DBResponseHttpRoute
|
||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
||||
zap.L().Info(finalQuery)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
for _, service := range dBResponse {
|
||||
if service.HttpRoute != "" {
|
||||
traceFilterReponse.HttpRoute[service.HttpRoute] = service.Count
|
||||
}
|
||||
}
|
||||
case constants.HttpUrl:
|
||||
finalQuery := fmt.Sprintf("SELECT httpUrl, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
||||
finalQuery += query
|
||||
finalQuery += " GROUP BY httpUrl"
|
||||
var dBResponse []model.DBResponseHttpUrl
|
||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
||||
zap.L().Info(finalQuery)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
for _, service := range dBResponse {
|
||||
if service.HttpUrl != "" {
|
||||
traceFilterReponse.HttpUrl[service.HttpUrl] = service.Count
|
||||
}
|
||||
}
|
||||
case constants.HttpMethod:
|
||||
finalQuery := fmt.Sprintf("SELECT httpMethod, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
||||
finalQuery += query
|
||||
finalQuery += " GROUP BY httpMethod"
|
||||
var dBResponse []model.DBResponseHttpMethod
|
||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
||||
zap.L().Info(finalQuery)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
for _, service := range dBResponse {
|
||||
if service.HttpMethod != "" {
|
||||
traceFilterReponse.HttpMethod[service.HttpMethod] = service.Count
|
||||
}
|
||||
}
|
||||
case constants.HttpHost:
|
||||
finalQuery := fmt.Sprintf("SELECT httpHost, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
||||
finalQuery += query
|
||||
finalQuery += " GROUP BY httpHost"
|
||||
var dBResponse []model.DBResponseHttpHost
|
||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
||||
zap.L().Info(finalQuery)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
for _, service := range dBResponse {
|
||||
if service.HttpHost != "" {
|
||||
traceFilterReponse.HttpHost[service.HttpHost] = service.Count
|
||||
}
|
||||
}
|
||||
case constants.OperationRequest:
|
||||
finalQuery := fmt.Sprintf("SELECT name, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
||||
finalQuery += query
|
||||
finalQuery += " GROUP BY name"
|
||||
var dBResponse []model.DBResponseOperation
|
||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
||||
zap.L().Info(finalQuery)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
for _, service := range dBResponse {
|
||||
if service.Operation != "" {
|
||||
traceFilterReponse.Operation[service.Operation] = service.Count
|
||||
}
|
||||
}
|
||||
case constants.Status:
|
||||
finalQuery := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = true", r.TraceDB, r.indexTable)
|
||||
finalQuery += query
|
||||
var dBResponse []model.DBResponseTotal
|
||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
||||
zap.L().Info(finalQuery)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
|
||||
finalQuery2 := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = false", r.TraceDB, r.indexTable)
|
||||
finalQuery2 += query
|
||||
var dBResponse2 []model.DBResponseTotal
|
||||
err = r.db.Select(ctx, &dBResponse2, finalQuery2, args...)
|
||||
zap.L().Info(finalQuery2)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
if len(dBResponse) > 0 && len(dBResponse2) > 0 {
|
||||
traceFilterReponse.Status = map[string]uint64{"ok": dBResponse2[0].NumTotal, "error": dBResponse[0].NumTotal}
|
||||
} else if len(dBResponse) > 0 {
|
||||
traceFilterReponse.Status = map[string]uint64{"ok": 0, "error": dBResponse[0].NumTotal}
|
||||
} else if len(dBResponse2) > 0 {
|
||||
traceFilterReponse.Status = map[string]uint64{"ok": dBResponse2[0].NumTotal, "error": 0}
|
||||
} else {
|
||||
traceFilterReponse.Status = map[string]uint64{"ok": 0, "error": 0}
|
||||
}
|
||||
case constants.Duration:
|
||||
err := r.featureFlags.CheckFeature(constants.DurationSort)
|
||||
durationSortEnabled := err == nil
|
||||
finalQuery := ""
|
||||
if !durationSortEnabled {
|
||||
// if duration sort is not enabled, we need to get the min and max duration from the index table
|
||||
finalQuery = fmt.Sprintf("SELECT min(durationNano) as min, max(durationNano) as max FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
||||
finalQuery += query
|
||||
var dBResponse []model.DBResponseMinMax
|
||||
err = r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
||||
zap.L().Info(finalQuery)
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
if len(dBResponse) > 0 {
|
||||
traceFilterReponse.Duration = map[string]uint64{"minDuration": dBResponse[0].Min, "maxDuration": dBResponse[0].Max}
|
||||
}
|
||||
} else {
|
||||
// when duration sort is enabled, we need to get the min and max duration from the duration table
|
||||
finalQuery = fmt.Sprintf("SELECT durationNano as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.durationTable)
|
||||
finalQuery += query
|
||||
finalQuery += " ORDER BY durationNano LIMIT 1"
|
||||
var dBResponse []model.DBResponseTotal
|
||||
err = r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
||||
zap.L().Info(finalQuery)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
|
||||
finalQuery = fmt.Sprintf("SELECT durationNano as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.durationTable)
|
||||
finalQuery += query
|
||||
finalQuery += " ORDER BY durationNano DESC LIMIT 1"
|
||||
var dBResponse2 []model.DBResponseTotal
|
||||
err = r.db.Select(ctx, &dBResponse2, finalQuery, args...)
|
||||
zap.L().Info(finalQuery)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
if len(dBResponse) > 0 {
|
||||
traceFilterReponse.Duration["minDuration"] = dBResponse[0].NumTotal
|
||||
}
|
||||
if len(dBResponse2) > 0 {
|
||||
traceFilterReponse.Duration["maxDuration"] = dBResponse2[0].NumTotal
|
||||
}
|
||||
}
|
||||
case constants.RPCMethod:
|
||||
finalQuery := fmt.Sprintf("SELECT rpcMethod, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
||||
finalQuery += query
|
||||
finalQuery += " GROUP BY rpcMethod"
|
||||
var dBResponse []model.DBResponseRPCMethod
|
||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
||||
zap.L().Info(finalQuery)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
for _, service := range dBResponse {
|
||||
if service.RPCMethod != "" {
|
||||
traceFilterReponse.RPCMethod[service.RPCMethod] = service.Count
|
||||
}
|
||||
}
|
||||
|
||||
case constants.ResponseStatusCode:
|
||||
finalQuery := fmt.Sprintf("SELECT responseStatusCode, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable)
|
||||
finalQuery += query
|
||||
finalQuery += " GROUP BY responseStatusCode"
|
||||
var dBResponse []model.DBResponseStatusCodeMethod
|
||||
err := r.db.Select(ctx, &dBResponse, finalQuery, args...)
|
||||
zap.L().Info(finalQuery)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
for _, service := range dBResponse {
|
||||
if service.ResponseStatusCode != "" {
|
||||
traceFilterReponse.ResponseStatusCode[service.ResponseStatusCode] = service.Count
|
||||
}
|
||||
}
|
||||
|
||||
default:
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("filter type: %s not supported", e)}
|
||||
}
|
||||
}
|
||||
|
||||
return &traceFilterReponse, nil
|
||||
}
|
||||
|
||||
func getStatusFilters(query string, statusParams []string, excludeMap map[string]struct{}) string {
|
||||
|
||||
// status can only be two and if both are selected than they are equivalent to none selected
|
||||
@ -1088,140 +787,6 @@ func getStatusFilters(query string, statusParams []string, excludeMap map[string
|
||||
return query
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetFilteredSpans(ctx context.Context, queryParams *model.GetFilteredSpansParams) (*model.GetFilterSpansResponse, *model.ApiError) {
|
||||
|
||||
queryTable := fmt.Sprintf("%s.%s", r.TraceDB, r.indexTable)
|
||||
|
||||
excludeMap := make(map[string]struct{})
|
||||
for _, e := range queryParams.Exclude {
|
||||
if e == constants.OperationRequest {
|
||||
excludeMap[constants.OperationDB] = struct{}{}
|
||||
continue
|
||||
}
|
||||
excludeMap[e] = struct{}{}
|
||||
}
|
||||
|
||||
var query string
|
||||
args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))}
|
||||
if len(queryParams.TraceID) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.TraceID, constants.TraceID, &query, args)
|
||||
}
|
||||
if len(queryParams.ServiceName) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpRoute) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpHost) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpMethod) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpUrl) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args)
|
||||
}
|
||||
if len(queryParams.Operation) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args)
|
||||
}
|
||||
if len(queryParams.RPCMethod) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.RPCMethod, constants.RPCMethod, &query, args)
|
||||
}
|
||||
|
||||
if len(queryParams.ResponseStatusCode) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ResponseStatusCode, constants.ResponseStatusCode, &query, args)
|
||||
}
|
||||
|
||||
if len(queryParams.MinDuration) != 0 {
|
||||
query = query + " AND durationNano >= @durationNanoMin"
|
||||
args = append(args, clickhouse.Named("durationNanoMin", queryParams.MinDuration))
|
||||
}
|
||||
if len(queryParams.MaxDuration) != 0 {
|
||||
query = query + " AND durationNano <= @durationNanoMax"
|
||||
args = append(args, clickhouse.Named("durationNanoMax", queryParams.MaxDuration))
|
||||
}
|
||||
query = getStatusFilters(query, queryParams.Status, excludeMap)
|
||||
|
||||
if len(queryParams.SpanKind) != 0 {
|
||||
query = query + " AND kind = @kind"
|
||||
args = append(args, clickhouse.Named("kind", queryParams.SpanKind))
|
||||
}
|
||||
|
||||
// create TagQuery from TagQueryParams
|
||||
tags := createTagQueryFromTagQueryParams(queryParams.Tags)
|
||||
subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags)
|
||||
query += subQuery
|
||||
args = append(args, argsSubQuery...)
|
||||
if errStatus != nil {
|
||||
return nil, errStatus
|
||||
}
|
||||
|
||||
if len(queryParams.OrderParam) != 0 {
|
||||
if queryParams.OrderParam == constants.Duration {
|
||||
queryTable = fmt.Sprintf("%s.%s", r.TraceDB, r.durationTable)
|
||||
if queryParams.Order == constants.Descending {
|
||||
query = query + " ORDER BY durationNano DESC"
|
||||
}
|
||||
if queryParams.Order == constants.Ascending {
|
||||
query = query + " ORDER BY durationNano ASC"
|
||||
}
|
||||
} else if queryParams.OrderParam == constants.Timestamp {
|
||||
projectionOptQuery := "SET allow_experimental_projection_optimization = 1"
|
||||
err := r.db.Exec(ctx, projectionOptQuery)
|
||||
|
||||
zap.L().Info(projectionOptQuery)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
||||
}
|
||||
if queryParams.Order == constants.Descending {
|
||||
query = query + " ORDER BY timestamp DESC"
|
||||
}
|
||||
if queryParams.Order == constants.Ascending {
|
||||
query = query + " ORDER BY timestamp ASC"
|
||||
}
|
||||
}
|
||||
}
|
||||
if queryParams.Limit > 0 {
|
||||
query = query + " LIMIT @limit"
|
||||
args = append(args, clickhouse.Named("limit", queryParams.Limit))
|
||||
}
|
||||
|
||||
if queryParams.Offset > 0 {
|
||||
query = query + " OFFSET @offset"
|
||||
args = append(args, clickhouse.Named("offset", queryParams.Offset))
|
||||
}
|
||||
|
||||
var getFilterSpansResponseItems []model.GetFilterSpansResponseItem
|
||||
|
||||
baseQuery := fmt.Sprintf("SELECT timestamp, spanID, traceID, serviceName, name, durationNano, httpMethod, rpcMethod, responseStatusCode FROM %s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryTable)
|
||||
baseQuery += query
|
||||
err := r.db.Select(ctx, &getFilterSpansResponseItems, baseQuery, args...)
|
||||
// Fill status and method
|
||||
for i, e := range getFilterSpansResponseItems {
|
||||
if e.RPCMethod != "" {
|
||||
getFilterSpansResponseItems[i].Method = e.RPCMethod
|
||||
} else {
|
||||
getFilterSpansResponseItems[i].Method = e.HttpMethod
|
||||
}
|
||||
}
|
||||
|
||||
zap.L().Info(baseQuery)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
||||
}
|
||||
|
||||
getFilterSpansResponse := model.GetFilterSpansResponse{
|
||||
Spans: getFilterSpansResponseItems,
|
||||
TotalSpans: 1000,
|
||||
}
|
||||
|
||||
return &getFilterSpansResponse, nil
|
||||
}
|
||||
|
||||
func createTagQueryFromTagQueryParams(queryParams []model.TagQueryParam) []model.TagQuery {
|
||||
tags := []model.TagQuery{}
|
||||
for _, tag := range queryParams {
|
||||
@ -1379,87 +944,6 @@ func addExistsOperator(item model.TagQuery, tagMapType string, not bool) (string
|
||||
return fmt.Sprintf(" AND %s (%s)", notStr, strings.Join(tagOperatorPair, " OR ")), args
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetTagFilters(ctx context.Context, queryParams *model.TagFilterParams) (*model.TagFilters, *model.ApiError) {
|
||||
|
||||
excludeMap := make(map[string]struct{})
|
||||
for _, e := range queryParams.Exclude {
|
||||
if e == constants.OperationRequest {
|
||||
excludeMap[constants.OperationDB] = struct{}{}
|
||||
continue
|
||||
}
|
||||
excludeMap[e] = struct{}{}
|
||||
}
|
||||
|
||||
var query string
|
||||
args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))}
|
||||
if len(queryParams.TraceID) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.TraceID, constants.TraceID, &query, args)
|
||||
}
|
||||
if len(queryParams.ServiceName) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpRoute) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpHost) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpMethod) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpUrl) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args)
|
||||
}
|
||||
if len(queryParams.Operation) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args)
|
||||
}
|
||||
if len(queryParams.RPCMethod) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.RPCMethod, constants.RPCMethod, &query, args)
|
||||
}
|
||||
if len(queryParams.ResponseStatusCode) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ResponseStatusCode, constants.ResponseStatusCode, &query, args)
|
||||
}
|
||||
if len(queryParams.MinDuration) != 0 {
|
||||
query = query + " AND durationNano >= @durationNanoMin"
|
||||
args = append(args, clickhouse.Named("durationNanoMin", queryParams.MinDuration))
|
||||
}
|
||||
if len(queryParams.MaxDuration) != 0 {
|
||||
query = query + " AND durationNano <= @durationNanoMax"
|
||||
args = append(args, clickhouse.Named("durationNanoMax", queryParams.MaxDuration))
|
||||
}
|
||||
if len(queryParams.SpanKind) != 0 {
|
||||
query = query + " AND kind = @kind"
|
||||
args = append(args, clickhouse.Named("kind", queryParams.SpanKind))
|
||||
}
|
||||
|
||||
query = getStatusFilters(query, queryParams.Status, excludeMap)
|
||||
|
||||
tagFilters := []model.TagFilters{}
|
||||
|
||||
// Alternative finalQuery := fmt.Sprintf(`SELECT DISTINCT arrayJoin(tagMap.keys) as tagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable)
|
||||
finalQuery := fmt.Sprintf(`SELECT groupUniqArrayArray(mapKeys(stringTagMap)) as stringTagKeys, groupUniqArrayArray(mapKeys(numberTagMap)) as numberTagKeys, groupUniqArrayArray(mapKeys(boolTagMap)) as boolTagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable)
|
||||
finalQuery += query
|
||||
err := r.db.Select(ctx, &tagFilters, finalQuery, args...)
|
||||
|
||||
zap.L().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
||||
}
|
||||
tagFiltersResult := model.TagFilters{
|
||||
StringTagKeys: make([]string, 0),
|
||||
NumberTagKeys: make([]string, 0),
|
||||
BoolTagKeys: make([]string, 0),
|
||||
}
|
||||
if len(tagFilters) != 0 {
|
||||
tagFiltersResult.StringTagKeys = excludeTags(ctx, tagFilters[0].StringTagKeys)
|
||||
tagFiltersResult.NumberTagKeys = excludeTags(ctx, tagFilters[0].NumberTagKeys)
|
||||
tagFiltersResult.BoolTagKeys = excludeTags(ctx, tagFilters[0].BoolTagKeys)
|
||||
}
|
||||
return &tagFiltersResult, nil
|
||||
}
|
||||
|
||||
func excludeTags(_ context.Context, tags []string) []string {
|
||||
excludedTagsMap := map[string]bool{
|
||||
"http.code": true,
|
||||
@ -1483,102 +967,6 @@ func excludeTags(_ context.Context, tags []string) []string {
|
||||
return newTags
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetTagValues(ctx context.Context, queryParams *model.TagFilterParams) (*model.TagValues, *model.ApiError) {
|
||||
|
||||
if queryParams.TagKey.Type == model.TagTypeNumber {
|
||||
return &model.TagValues{
|
||||
NumberTagValues: make([]float64, 0),
|
||||
StringTagValues: make([]string, 0),
|
||||
BoolTagValues: make([]bool, 0),
|
||||
}, nil
|
||||
} else if queryParams.TagKey.Type == model.TagTypeBool {
|
||||
return &model.TagValues{
|
||||
NumberTagValues: make([]float64, 0),
|
||||
StringTagValues: make([]string, 0),
|
||||
BoolTagValues: []bool{true, false},
|
||||
}, nil
|
||||
}
|
||||
|
||||
excludeMap := make(map[string]struct{})
|
||||
for _, e := range queryParams.Exclude {
|
||||
if e == constants.OperationRequest {
|
||||
excludeMap[constants.OperationDB] = struct{}{}
|
||||
continue
|
||||
}
|
||||
excludeMap[e] = struct{}{}
|
||||
}
|
||||
|
||||
var query string
|
||||
args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))}
|
||||
if len(queryParams.TraceID) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.TraceID, constants.TraceID, &query, args)
|
||||
}
|
||||
if len(queryParams.ServiceName) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpRoute) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpHost) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpMethod) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpUrl) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args)
|
||||
}
|
||||
if len(queryParams.Operation) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args)
|
||||
}
|
||||
if len(queryParams.MinDuration) != 0 {
|
||||
query = query + " AND durationNano >= @durationNanoMin"
|
||||
args = append(args, clickhouse.Named("durationNanoMin", queryParams.MinDuration))
|
||||
}
|
||||
if len(queryParams.MaxDuration) != 0 {
|
||||
query = query + " AND durationNano <= @durationNanoMax"
|
||||
args = append(args, clickhouse.Named("durationNanoMax", queryParams.MaxDuration))
|
||||
}
|
||||
if len(queryParams.SpanKind) != 0 {
|
||||
query = query + " AND kind = @kind"
|
||||
args = append(args, clickhouse.Named("kind", queryParams.SpanKind))
|
||||
}
|
||||
|
||||
query = getStatusFilters(query, queryParams.Status, excludeMap)
|
||||
|
||||
tagValues := []model.TagValues{}
|
||||
|
||||
finalQuery := fmt.Sprintf(`SELECT groupArray(DISTINCT stringTagMap[@key]) as stringTagValues FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable)
|
||||
finalQuery += query
|
||||
finalQuery += " LIMIT @limit"
|
||||
|
||||
args = append(args, clickhouse.Named("key", queryParams.TagKey.Key))
|
||||
args = append(args, clickhouse.Named("limit", queryParams.Limit))
|
||||
err := r.db.Select(ctx, &tagValues, finalQuery, args...)
|
||||
|
||||
zap.L().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
||||
}
|
||||
|
||||
cleanedTagValues := model.TagValues{
|
||||
StringTagValues: []string{},
|
||||
NumberTagValues: []float64{},
|
||||
BoolTagValues: []bool{},
|
||||
}
|
||||
if len(tagValues) == 0 {
|
||||
return &cleanedTagValues, nil
|
||||
}
|
||||
for _, e := range tagValues[0].StringTagValues {
|
||||
if e != "" {
|
||||
cleanedTagValues.StringTagValues = append(cleanedTagValues.StringTagValues, e)
|
||||
}
|
||||
}
|
||||
return &cleanedTagValues, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *model.GetTopOperationsParams) (*[]model.TopOperationsItem, *model.ApiError) {
|
||||
|
||||
namedArgs := []interface{}{
|
||||
@ -1823,185 +1211,6 @@ func (r *ClickHouseReader) GetDependencyGraph(ctx context.Context, queryParams *
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, queryParams *model.GetFilteredSpanAggregatesParams) (*model.GetFilteredSpansAggregatesResponse, *model.ApiError) {
|
||||
|
||||
excludeMap := make(map[string]struct{})
|
||||
for _, e := range queryParams.Exclude {
|
||||
if e == constants.OperationRequest {
|
||||
excludeMap[constants.OperationDB] = struct{}{}
|
||||
continue
|
||||
}
|
||||
excludeMap[e] = struct{}{}
|
||||
}
|
||||
|
||||
SpanAggregatesDBResponseItems := []model.SpanAggregatesDBResponseItem{}
|
||||
|
||||
aggregation_query := ""
|
||||
if queryParams.Dimension == "duration" {
|
||||
switch queryParams.AggregationOption {
|
||||
case "p50":
|
||||
aggregation_query = " quantile(0.50)(durationNano) as float64Value "
|
||||
case "p95":
|
||||
aggregation_query = " quantile(0.95)(durationNano) as float64Value "
|
||||
case "p90":
|
||||
aggregation_query = " quantile(0.90)(durationNano) as float64Value "
|
||||
case "p99":
|
||||
aggregation_query = " quantile(0.99)(durationNano) as float64Value "
|
||||
case "max":
|
||||
aggregation_query = " max(durationNano) as value "
|
||||
case "min":
|
||||
aggregation_query = " min(durationNano) as value "
|
||||
case "avg":
|
||||
aggregation_query = " avg(durationNano) as float64Value "
|
||||
case "sum":
|
||||
aggregation_query = " sum(durationNano) as value "
|
||||
default:
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("aggregate type: %s not supported", queryParams.AggregationOption)}
|
||||
}
|
||||
} else if queryParams.Dimension == "calls" {
|
||||
aggregation_query = " count(*) as value "
|
||||
}
|
||||
|
||||
args := []interface{}{clickhouse.Named("timestampL", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), clickhouse.Named("timestampU", strconv.FormatInt(queryParams.End.UnixNano(), 10))}
|
||||
|
||||
var query string
|
||||
var customStr []string
|
||||
_, columnExists := constants.GroupByColMap[queryParams.GroupBy]
|
||||
// Using %s for groupBy params as it can be a custom column and custom columns are not supported by clickhouse-go yet:
|
||||
// issue link: https://github.com/ClickHouse/clickhouse-go/issues/870
|
||||
if queryParams.GroupBy != "" && columnExists {
|
||||
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, queryParams.GroupBy, aggregation_query, r.TraceDB, r.indexTable)
|
||||
args = append(args, clickhouse.Named("groupByVar", queryParams.GroupBy))
|
||||
} else if queryParams.GroupBy != "" {
|
||||
customStr = strings.Split(queryParams.GroupBy, ".(")
|
||||
if len(customStr) < 2 {
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("GroupBy: %s not supported", queryParams.GroupBy)}
|
||||
}
|
||||
if customStr[1] == string(model.TagTypeString)+")" {
|
||||
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, stringTagMap['%s'] as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable)
|
||||
} else if customStr[1] == string(model.TagTypeNumber)+")" {
|
||||
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, toString(numberTagMap['%s']) as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable)
|
||||
} else if customStr[1] == string(model.TagTypeBool)+")" {
|
||||
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, toString(boolTagMap['%s']) as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable)
|
||||
} else {
|
||||
// return error for unsupported group by
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("GroupBy: %s not supported", queryParams.GroupBy)}
|
||||
}
|
||||
} else {
|
||||
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable)
|
||||
}
|
||||
|
||||
if len(queryParams.TraceID) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.TraceID, constants.TraceID, &query, args)
|
||||
}
|
||||
if len(queryParams.ServiceName) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpRoute) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpHost) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpMethod) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args)
|
||||
}
|
||||
if len(queryParams.HttpUrl) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args)
|
||||
}
|
||||
if len(queryParams.Operation) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args)
|
||||
}
|
||||
if len(queryParams.RPCMethod) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.RPCMethod, constants.RPCMethod, &query, args)
|
||||
}
|
||||
if len(queryParams.ResponseStatusCode) > 0 {
|
||||
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ResponseStatusCode, constants.ResponseStatusCode, &query, args)
|
||||
}
|
||||
if len(queryParams.MinDuration) != 0 {
|
||||
query = query + " AND durationNano >= @durationNanoMin"
|
||||
args = append(args, clickhouse.Named("durationNanoMin", queryParams.MinDuration))
|
||||
}
|
||||
if len(queryParams.MaxDuration) != 0 {
|
||||
query = query + " AND durationNano <= @durationNanoMax"
|
||||
args = append(args, clickhouse.Named("durationNanoMax", queryParams.MaxDuration))
|
||||
}
|
||||
query = getStatusFilters(query, queryParams.Status, excludeMap)
|
||||
|
||||
if len(queryParams.SpanKind) != 0 {
|
||||
query = query + " AND kind = @kind"
|
||||
args = append(args, clickhouse.Named("kind", queryParams.SpanKind))
|
||||
}
|
||||
// create TagQuery from TagQueryParams
|
||||
tags := createTagQueryFromTagQueryParams(queryParams.Tags)
|
||||
subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags)
|
||||
query += subQuery
|
||||
args = append(args, argsSubQuery...)
|
||||
|
||||
if errStatus != nil {
|
||||
return nil, errStatus
|
||||
}
|
||||
|
||||
if queryParams.GroupBy != "" && columnExists {
|
||||
query = query + fmt.Sprintf(" GROUP BY time, %s as groupBy ORDER BY time", queryParams.GroupBy)
|
||||
} else if queryParams.GroupBy != "" {
|
||||
if customStr[1] == string(model.TagTypeString)+")" {
|
||||
query = query + fmt.Sprintf(" GROUP BY time, stringTagMap['%s'] as groupBy ORDER BY time", customStr[0])
|
||||
} else if customStr[1] == string(model.TagTypeNumber)+")" {
|
||||
query = query + fmt.Sprintf(" GROUP BY time, toString(numberTagMap['%s']) as groupBy ORDER BY time", customStr[0])
|
||||
} else if customStr[1] == string(model.TagTypeBool)+")" {
|
||||
query = query + fmt.Sprintf(" GROUP BY time, toString(boolTagMap['%s']) as groupBy ORDER BY time", customStr[0])
|
||||
}
|
||||
} else {
|
||||
query = query + " GROUP BY time ORDER BY time"
|
||||
}
|
||||
|
||||
err := r.db.Select(ctx, &SpanAggregatesDBResponseItems, query, args...)
|
||||
|
||||
zap.L().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
||||
}
|
||||
|
||||
GetFilteredSpansAggregatesResponse := model.GetFilteredSpansAggregatesResponse{
|
||||
Items: map[int64]model.SpanAggregatesResponseItem{},
|
||||
}
|
||||
|
||||
for i := range SpanAggregatesDBResponseItems {
|
||||
if SpanAggregatesDBResponseItems[i].Value == 0 {
|
||||
SpanAggregatesDBResponseItems[i].Value = uint64(SpanAggregatesDBResponseItems[i].Float64Value)
|
||||
}
|
||||
SpanAggregatesDBResponseItems[i].Timestamp = int64(SpanAggregatesDBResponseItems[i].Time.UnixNano())
|
||||
SpanAggregatesDBResponseItems[i].FloatValue = float32(SpanAggregatesDBResponseItems[i].Value)
|
||||
if queryParams.AggregationOption == "rate_per_sec" {
|
||||
SpanAggregatesDBResponseItems[i].FloatValue = float32(SpanAggregatesDBResponseItems[i].Value) / float32(queryParams.StepSeconds)
|
||||
}
|
||||
if responseElement, ok := GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp]; !ok {
|
||||
if queryParams.GroupBy != "" && SpanAggregatesDBResponseItems[i].GroupBy != "" {
|
||||
GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp] = model.SpanAggregatesResponseItem{
|
||||
Timestamp: SpanAggregatesDBResponseItems[i].Timestamp,
|
||||
GroupBy: map[string]float32{SpanAggregatesDBResponseItems[i].GroupBy: SpanAggregatesDBResponseItems[i].FloatValue},
|
||||
}
|
||||
} else if queryParams.GroupBy == "" {
|
||||
GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp] = model.SpanAggregatesResponseItem{
|
||||
Timestamp: SpanAggregatesDBResponseItems[i].Timestamp,
|
||||
Value: SpanAggregatesDBResponseItems[i].FloatValue,
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
if queryParams.GroupBy != "" && SpanAggregatesDBResponseItems[i].GroupBy != "" {
|
||||
responseElement.GroupBy[SpanAggregatesDBResponseItems[i].GroupBy] = SpanAggregatesDBResponseItems[i].FloatValue
|
||||
}
|
||||
GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp] = responseElement
|
||||
}
|
||||
}
|
||||
|
||||
return &GetFilteredSpansAggregatesResponse, nil
|
||||
}
|
||||
|
||||
func getLocalTableName(tableName string) string {
|
||||
|
||||
tableNameSplit := strings.Split(tableName, ".")
|
||||
|
@ -526,12 +526,6 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) {
|
||||
router.HandleFunc("/api/v1/configs", am.OpenAccess(aH.getConfigs)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/health", am.OpenAccess(aH.getHealth)).Methods(http.MethodGet)
|
||||
|
||||
router.HandleFunc("/api/v1/getSpanFilters", am.ViewAccess(aH.getSpanFilters)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/getTagFilters", am.ViewAccess(aH.getTagFilters)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/getFilteredSpans", am.ViewAccess(aH.getFilteredSpans)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/getFilteredSpans/aggregates", am.ViewAccess(aH.getFilteredSpanAggregates)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/getTagValues", am.ViewAccess(aH.getTagValues)).Methods(http.MethodPost)
|
||||
|
||||
router.HandleFunc("/api/v1/listErrors", am.ViewAccess(aH.listErrors)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/countErrors", am.ViewAccess(aH.countErrors)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/errorFromErrorID", am.ViewAccess(aH.getErrorFromErrorID)).Methods(http.MethodGet)
|
||||
@ -1847,86 +1841,6 @@ func (aH *APIHandler) getErrorFromGroupID(w http.ResponseWriter, r *http.Request
|
||||
aH.WriteJSON(w, r, result)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) getSpanFilters(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
query, err := parseSpanFilterRequestBody(r)
|
||||
if aH.HandleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
|
||||
result, apiErr := aH.reader.GetSpanFilters(r.Context(), query)
|
||||
|
||||
if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) {
|
||||
return
|
||||
}
|
||||
|
||||
aH.WriteJSON(w, r, result)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) getFilteredSpans(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
query, err := parseFilteredSpansRequest(r, aH)
|
||||
if aH.HandleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
|
||||
result, apiErr := aH.reader.GetFilteredSpans(r.Context(), query)
|
||||
|
||||
if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) {
|
||||
return
|
||||
}
|
||||
|
||||
aH.WriteJSON(w, r, result)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) getFilteredSpanAggregates(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
query, err := parseFilteredSpanAggregatesRequest(r)
|
||||
if aH.HandleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
|
||||
result, apiErr := aH.reader.GetFilteredSpansAggregates(r.Context(), query)
|
||||
|
||||
if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) {
|
||||
return
|
||||
}
|
||||
|
||||
aH.WriteJSON(w, r, result)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) getTagFilters(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
query, err := parseTagFilterRequest(r)
|
||||
if aH.HandleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
|
||||
result, apiErr := aH.reader.GetTagFilters(r.Context(), query)
|
||||
|
||||
if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) {
|
||||
return
|
||||
}
|
||||
|
||||
aH.WriteJSON(w, r, result)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) getTagValues(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
query, err := parseTagValueRequest(r)
|
||||
if aH.HandleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
|
||||
result, apiErr := aH.reader.GetTagValues(r.Context(), query)
|
||||
|
||||
if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) {
|
||||
return
|
||||
}
|
||||
|
||||
aH.WriteJSON(w, r, result)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) setTTL(w http.ResponseWriter, r *http.Request) {
|
||||
ttlParams, err := parseTTLParams(r)
|
||||
if aH.HandleError(w, err, http.StatusBadRequest) {
|
||||
|
@ -142,7 +142,7 @@ func enrichFieldWithMetadata(field v3.AttributeKey, fields map[string]v3.Attribu
|
||||
}
|
||||
|
||||
// check if the field is present in the fields map
|
||||
for _, key := range utils.GenerateLogEnrichmentKeys(field) {
|
||||
for _, key := range utils.GenerateEnrichmentKeys(field) {
|
||||
if val, ok := fields[key]; ok {
|
||||
return val
|
||||
}
|
||||
|
@ -384,6 +384,11 @@ func LogCommentEnricher(next http.Handler) http.Handler {
|
||||
client = "api"
|
||||
}
|
||||
|
||||
email, err := auth.GetEmailFromJwt(r.Context())
|
||||
if err != nil {
|
||||
zap.S().Errorf("error while getting email from jwt: %v", err)
|
||||
}
|
||||
|
||||
kvs := map[string]string{
|
||||
"path": path,
|
||||
"dashboardID": dashboardID,
|
||||
@ -392,6 +397,7 @@ func LogCommentEnricher(next http.Handler) http.Handler {
|
||||
"client": client,
|
||||
"viewName": viewName,
|
||||
"servicesTab": tab,
|
||||
"email": email,
|
||||
}
|
||||
|
||||
r = r.WithContext(context.WithValue(r.Context(), common.LogCommentKey, kvs))
|
||||
|
118
pkg/query-service/app/traces/v4/enrich.go
Normal file
118
pkg/query-service/app/traces/v4/enrich.go
Normal file
@ -0,0 +1,118 @@
|
||||
package v4
|
||||
|
||||
import (
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils"
|
||||
)
|
||||
|
||||
// if the field is timestamp/id/value we don't need to enrich
|
||||
// if the field is static we don't need to enrich
|
||||
// for all others we need to enrich
|
||||
// an attribute/resource can be materialized/dematerialized
|
||||
// but the query should work regardless and shouldn't fail
|
||||
func isEnriched(field v3.AttributeKey) bool {
|
||||
// if it is timestamp/id dont check
|
||||
if field.Key == "timestamp" || field.Key == constants.SigNozOrderByValue {
|
||||
return true
|
||||
}
|
||||
|
||||
// we need to check if the field is static and return false if isColumn is not set
|
||||
if _, ok := constants.StaticFieldsTraces[field.Key]; ok && field.IsColumn {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func enrichKeyWithMetadata(key v3.AttributeKey, keys map[string]v3.AttributeKey) v3.AttributeKey {
|
||||
if isEnriched(key) {
|
||||
return key
|
||||
}
|
||||
|
||||
if v, ok := constants.StaticFieldsTraces[key.Key]; ok {
|
||||
return v
|
||||
}
|
||||
|
||||
for _, key := range utils.GenerateEnrichmentKeys(key) {
|
||||
if val, ok := keys[key]; ok {
|
||||
return val
|
||||
}
|
||||
}
|
||||
|
||||
// enrich with default values if metadata is not found
|
||||
if key.Type == "" {
|
||||
key.Type = v3.AttributeKeyTypeTag
|
||||
}
|
||||
if key.DataType == "" {
|
||||
key.DataType = v3.AttributeKeyDataTypeString
|
||||
}
|
||||
return key
|
||||
}
|
||||
|
||||
func Enrich(params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) {
|
||||
if params.CompositeQuery.QueryType != v3.QueryTypeBuilder {
|
||||
return
|
||||
}
|
||||
|
||||
for _, query := range params.CompositeQuery.BuilderQueries {
|
||||
if query.DataSource == v3.DataSourceTraces {
|
||||
EnrichTracesQuery(query, keys)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func EnrichTracesQuery(query *v3.BuilderQuery, keys map[string]v3.AttributeKey) {
|
||||
// enrich aggregate attribute
|
||||
query.AggregateAttribute = enrichKeyWithMetadata(query.AggregateAttribute, keys)
|
||||
|
||||
// enrich filter items
|
||||
if query.Filters != nil && len(query.Filters.Items) > 0 {
|
||||
for idx, filter := range query.Filters.Items {
|
||||
query.Filters.Items[idx].Key = enrichKeyWithMetadata(filter.Key, keys)
|
||||
// if the serviceName column is used, use the corresponding resource attribute as well during filtering
|
||||
// since there is only one of these resource attributes we are adding it here directly.
|
||||
// move it somewhere else if this list is big
|
||||
if filter.Key.Key == "serviceName" {
|
||||
query.Filters.Items[idx].Key = v3.AttributeKey{
|
||||
Key: "service.name",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeResource,
|
||||
IsColumn: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// enrich group by
|
||||
for idx, groupBy := range query.GroupBy {
|
||||
query.GroupBy[idx] = enrichKeyWithMetadata(groupBy, keys)
|
||||
}
|
||||
|
||||
// enrich order by
|
||||
query.OrderBy = enrichOrderBy(query.OrderBy, keys)
|
||||
|
||||
// enrich select columns
|
||||
for idx, selectColumn := range query.SelectColumns {
|
||||
query.SelectColumns[idx] = enrichKeyWithMetadata(selectColumn, keys)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func enrichOrderBy(items []v3.OrderBy, keys map[string]v3.AttributeKey) []v3.OrderBy {
|
||||
enrichedItems := []v3.OrderBy{}
|
||||
for i := 0; i < len(items); i++ {
|
||||
attributeKey := enrichKeyWithMetadata(v3.AttributeKey{
|
||||
Key: items[i].ColumnName,
|
||||
}, keys)
|
||||
enrichedItems = append(enrichedItems, v3.OrderBy{
|
||||
ColumnName: items[i].ColumnName,
|
||||
Order: items[i].Order,
|
||||
Key: attributeKey.Key,
|
||||
DataType: attributeKey.DataType,
|
||||
Type: attributeKey.Type,
|
||||
IsColumn: attributeKey.IsColumn,
|
||||
})
|
||||
}
|
||||
return enrichedItems
|
||||
}
|
196
pkg/query-service/app/traces/v4/enrich_test.go
Normal file
196
pkg/query-service/app/traces/v4/enrich_test.go
Normal file
@ -0,0 +1,196 @@
|
||||
package v4
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
)
|
||||
|
||||
func TestEnrichTracesQuery(t *testing.T) {
|
||||
type args struct {
|
||||
query *v3.BuilderQuery
|
||||
keys map[string]v3.AttributeKey
|
||||
want *v3.BuilderQuery
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
}{
|
||||
{
|
||||
name: "test 1",
|
||||
args: args{
|
||||
query: &v3.BuilderQuery{
|
||||
Filters: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "bytes", Type: v3.AttributeKeyTypeTag}, Value: 100, Operator: ">"},
|
||||
},
|
||||
},
|
||||
OrderBy: []v3.OrderBy{},
|
||||
},
|
||||
keys: map[string]v3.AttributeKey{
|
||||
"bytes##tag##int64": {Key: "bytes", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeTag},
|
||||
},
|
||||
want: &v3.BuilderQuery{
|
||||
Filters: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "bytes", Type: v3.AttributeKeyTypeTag, DataType: v3.AttributeKeyDataTypeInt64}, Value: 100, Operator: ">"},
|
||||
},
|
||||
},
|
||||
OrderBy: []v3.OrderBy{},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "test service name",
|
||||
args: args{
|
||||
query: &v3.BuilderQuery{
|
||||
Filters: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "serviceName", DataType: v3.AttributeKeyDataTypeString, IsColumn: true}, Value: "myservice", Operator: "="},
|
||||
{Key: v3.AttributeKey{Key: "serviceName"}, Value: "myservice", Operator: "="},
|
||||
},
|
||||
},
|
||||
OrderBy: []v3.OrderBy{},
|
||||
},
|
||||
keys: map[string]v3.AttributeKey{},
|
||||
want: &v3.BuilderQuery{
|
||||
Filters: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "service.name", Type: v3.AttributeKeyTypeResource, DataType: v3.AttributeKeyDataTypeString}, Value: "myservice", Operator: "="},
|
||||
{Key: v3.AttributeKey{Key: "service.name", Type: v3.AttributeKeyTypeResource, DataType: v3.AttributeKeyDataTypeString}, Value: "myservice", Operator: "="},
|
||||
},
|
||||
},
|
||||
OrderBy: []v3.OrderBy{},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "test mat attrs",
|
||||
args: args{
|
||||
query: &v3.BuilderQuery{
|
||||
Filters: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "http.route", DataType: v3.AttributeKeyDataTypeString, IsColumn: true}, Value: "/api", Operator: "="},
|
||||
{Key: v3.AttributeKey{Key: "msgSystem"}, Value: "name", Operator: "="},
|
||||
{Key: v3.AttributeKey{Key: "external_http_url"}, Value: "name", Operator: "="},
|
||||
},
|
||||
},
|
||||
OrderBy: []v3.OrderBy{},
|
||||
},
|
||||
keys: map[string]v3.AttributeKey{},
|
||||
want: &v3.BuilderQuery{
|
||||
Filters: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "http.route", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Value: "/api", Operator: "="},
|
||||
{Key: v3.AttributeKey{Key: "msgSystem", DataType: v3.AttributeKeyDataTypeString, IsColumn: true}, Value: "name", Operator: "="},
|
||||
{Key: v3.AttributeKey{Key: "external_http_url", DataType: v3.AttributeKeyDataTypeString, IsColumn: true}, Value: "name", Operator: "="},
|
||||
},
|
||||
},
|
||||
OrderBy: []v3.OrderBy{},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "test aggregateattr, filter, groupby, order by",
|
||||
args: args{
|
||||
query: &v3.BuilderQuery{
|
||||
AggregateOperator: v3.AggregateOperatorCount,
|
||||
AggregateAttribute: v3.AttributeKey{
|
||||
Key: "http.route",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeTag,
|
||||
},
|
||||
Filters: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "http.route", DataType: v3.AttributeKeyDataTypeString}, Value: "/api", Operator: "="},
|
||||
},
|
||||
},
|
||||
GroupBy: []v3.AttributeKey{
|
||||
{Key: "http.route", DataType: v3.AttributeKeyDataTypeString},
|
||||
{Key: "msgSystem", DataType: v3.AttributeKeyDataTypeString},
|
||||
},
|
||||
OrderBy: []v3.OrderBy{
|
||||
{ColumnName: "httpRoute", Order: v3.DirectionAsc},
|
||||
},
|
||||
},
|
||||
keys: map[string]v3.AttributeKey{
|
||||
"http.route##tag##string": {Key: "http.route", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true},
|
||||
},
|
||||
want: &v3.BuilderQuery{
|
||||
AggregateAttribute: v3.AttributeKey{
|
||||
Key: "http.route",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeTag,
|
||||
IsColumn: true,
|
||||
},
|
||||
Filters: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "http.route", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Value: "/api", Operator: "="},
|
||||
},
|
||||
},
|
||||
GroupBy: []v3.AttributeKey{
|
||||
{Key: "http.route", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true},
|
||||
{Key: "msgSystem", DataType: v3.AttributeKeyDataTypeString, IsJSON: false, IsColumn: true},
|
||||
},
|
||||
OrderBy: []v3.OrderBy{
|
||||
{Key: "httpRoute", Order: v3.DirectionAsc, ColumnName: "httpRoute", DataType: v3.AttributeKeyDataTypeString, IsColumn: true},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "enrich default values",
|
||||
args: args{
|
||||
query: &v3.BuilderQuery{
|
||||
Filters: &v3.FilterSet{
|
||||
Items: []v3.FilterItem{
|
||||
{Key: v3.AttributeKey{Key: "testattr"}},
|
||||
},
|
||||
},
|
||||
OrderBy: []v3.OrderBy{{ColumnName: "timestamp", Order: v3.DirectionAsc}},
|
||||
},
|
||||
keys: map[string]v3.AttributeKey{},
|
||||
want: &v3.BuilderQuery{
|
||||
Filters: &v3.FilterSet{
|
||||
Items: []v3.FilterItem{{Key: v3.AttributeKey{Key: "testattr", Type: v3.AttributeKeyTypeTag, DataType: v3.AttributeKeyDataTypeString}}},
|
||||
},
|
||||
// isColumn won't matter in timestamp as it will always be a column
|
||||
OrderBy: []v3.OrderBy{{Key: "timestamp", Order: v3.DirectionAsc, ColumnName: "timestamp"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
EnrichTracesQuery(tt.args.query, tt.args.keys)
|
||||
// Check AggregateAttribute
|
||||
if tt.args.query.AggregateAttribute.Key != "" && !reflect.DeepEqual(tt.args.query.AggregateAttribute, tt.args.want.AggregateAttribute) {
|
||||
t.Errorf("EnrichTracesQuery() AggregateAttribute = %v, want %v", tt.args.query.AggregateAttribute, tt.args.want.AggregateAttribute)
|
||||
}
|
||||
|
||||
// Check Filters
|
||||
if tt.args.query.Filters != nil && !reflect.DeepEqual(tt.args.query.Filters, tt.args.want.Filters) {
|
||||
t.Errorf("EnrichTracesQuery() Filters = %v, want %v", tt.args.query.Filters, tt.args.want.Filters)
|
||||
}
|
||||
|
||||
// Check GroupBy
|
||||
if tt.args.query.GroupBy != nil && !reflect.DeepEqual(tt.args.query.GroupBy, tt.args.want.GroupBy) {
|
||||
t.Errorf("EnrichTracesQuery() GroupBy = %v, want %v", tt.args.query.GroupBy, tt.args.want.GroupBy)
|
||||
}
|
||||
|
||||
// Check OrderBy
|
||||
if tt.args.query.OrderBy != nil && !reflect.DeepEqual(tt.args.query.OrderBy, tt.args.want.OrderBy) {
|
||||
t.Errorf("EnrichTracesQuery() OrderBy = %v, want %v", tt.args.query.OrderBy, tt.args.want.OrderBy)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -124,6 +124,27 @@ func Test_getColumnName(t *testing.T) {
|
||||
},
|
||||
want: "attributes_string['xyz']",
|
||||
},
|
||||
{
|
||||
name: "new composite column",
|
||||
args: args{
|
||||
key: v3.AttributeKey{Key: "response_status_code"},
|
||||
},
|
||||
want: "response_status_code",
|
||||
},
|
||||
{
|
||||
name: "new composite column with metadata",
|
||||
args: args{
|
||||
key: v3.AttributeKey{Key: "response_status_code", DataType: v3.AttributeKeyDataTypeString, IsColumn: true},
|
||||
},
|
||||
want: "response_status_code",
|
||||
},
|
||||
{
|
||||
name: "new normal column with metadata",
|
||||
args: args{
|
||||
key: v3.AttributeKey{Key: "http.route", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true},
|
||||
},
|
||||
want: "`attribute_string_http$$route`",
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
|
@ -589,6 +589,60 @@ var StaticFieldsTraces = map[string]v3.AttributeKey{
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
IsColumn: true,
|
||||
},
|
||||
|
||||
// new support
|
||||
"response_status_code": {
|
||||
Key: "response_status_code",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
IsColumn: true,
|
||||
},
|
||||
"external_http_url": {
|
||||
Key: "external_http_url",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
IsColumn: true,
|
||||
},
|
||||
"http_url": {
|
||||
Key: "http_url",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
IsColumn: true,
|
||||
},
|
||||
"external_http_method": {
|
||||
Key: "external_http_method",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
IsColumn: true,
|
||||
},
|
||||
"http_method": {
|
||||
Key: "http_method",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
IsColumn: true,
|
||||
},
|
||||
"http_host": {
|
||||
Key: "http_host",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
IsColumn: true,
|
||||
},
|
||||
"db_name": {
|
||||
Key: "db_name",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
IsColumn: true,
|
||||
},
|
||||
"db_operation": {
|
||||
Key: "db_operation",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
IsColumn: true,
|
||||
},
|
||||
"has_error": {
|
||||
Key: "has_error",
|
||||
DataType: v3.AttributeKeyDataTypeBool,
|
||||
IsColumn: true,
|
||||
},
|
||||
"is_remote": {
|
||||
Key: "is_remote",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
IsColumn: true,
|
||||
},
|
||||
// the simple attributes are not present here as
|
||||
// they are taken care by new format <attribute_type>_<attribute_datatype>_'<attribute_key>'
|
||||
}
|
||||
|
||||
const TRACE_V4_MAX_PAGINATION_LIMIT = 10000
|
||||
|
@ -183,7 +183,7 @@ func PrepareFilters(labels map[string]string, whereClauseItems []v3.FilterItem,
|
||||
var attrFound bool
|
||||
|
||||
// as of now this logic will only apply for logs
|
||||
for _, tKey := range utils.GenerateLogEnrichmentKeys(v3.AttributeKey{Key: key}) {
|
||||
for _, tKey := range utils.GenerateEnrichmentKeys(v3.AttributeKey{Key: key}) {
|
||||
if val, ok := keys[tKey]; ok {
|
||||
attributeKey = val
|
||||
attrFound = true
|
||||
|
@ -29,15 +29,10 @@ type Reader interface {
|
||||
// GetDisks returns a list of disks configured in the underlying DB. It is supported by
|
||||
// clickhouse only.
|
||||
GetDisks(ctx context.Context) (*[]model.DiskItem, *model.ApiError)
|
||||
GetSpanFilters(ctx context.Context, query *model.SpanFilterParams) (*model.SpanFiltersResponse, *model.ApiError)
|
||||
GetTraceAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error)
|
||||
GetTraceAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error)
|
||||
GetTraceAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error)
|
||||
GetSpanAttributeKeys(ctx context.Context) (map[string]v3.AttributeKey, error)
|
||||
GetTagFilters(ctx context.Context, query *model.TagFilterParams) (*model.TagFilters, *model.ApiError)
|
||||
GetTagValues(ctx context.Context, query *model.TagFilterParams) (*model.TagValues, *model.ApiError)
|
||||
GetFilteredSpans(ctx context.Context, query *model.GetFilteredSpansParams) (*model.GetFilterSpansResponse, *model.ApiError)
|
||||
GetFilteredSpansAggregates(ctx context.Context, query *model.GetFilteredSpanAggregatesParams) (*model.GetFilteredSpansAggregatesResponse, *model.ApiError)
|
||||
|
||||
ListErrors(ctx context.Context, params *model.ListErrorsParams) (*[]model.Error, *model.ApiError)
|
||||
CountErrors(ctx context.Context, params *model.CountErrorsParams) (uint64, *model.ApiError)
|
||||
|
@ -463,9 +463,9 @@ func (r *BaseRule) ShouldAlert(series v3.Series) (Sample, bool) {
|
||||
}
|
||||
} else if r.compareOp() == ValueOutsideBounds {
|
||||
for _, smpl := range series.Points {
|
||||
if math.Abs(smpl.Value) >= r.targetVal() {
|
||||
if math.Abs(smpl.Value) < r.targetVal() {
|
||||
alertSmpl = Sample{Point: Point{V: smpl.Value}, Metric: lbls}
|
||||
shouldAlert = true
|
||||
shouldAlert = false
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -347,6 +347,7 @@ func createTelemetry() {
|
||||
"alertsWithTSV2": alertsInfo.AlertsWithTSV2,
|
||||
"logsBasedAlerts": alertsInfo.LogsBasedAlerts,
|
||||
"metricBasedAlerts": alertsInfo.MetricBasedAlerts,
|
||||
"anomalyBasedAlerts": alertsInfo.AnomalyBasedAlerts,
|
||||
"tracesBasedAlerts": alertsInfo.TracesBasedAlerts,
|
||||
"totalChannels": alertsInfo.TotalChannels,
|
||||
"totalSavedViews": savedViewsInfo.TotalSavedViews,
|
||||
|
@ -35,13 +35,15 @@ func GetListTsRanges(start, end int64) []LogsListTsRange {
|
||||
tStartNano = startNano
|
||||
}
|
||||
}
|
||||
} else {
|
||||
result = append(result, LogsListTsRange{Start: start, End: end})
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// This tries to see all possible fields that it can fall back to if some meta is missing
|
||||
// check Test_GenerateLogEnrichmentKeys for example
|
||||
func GenerateLogEnrichmentKeys(field v3.AttributeKey) []string {
|
||||
// check Test_GenerateEnrichmentKeys for example
|
||||
func GenerateEnrichmentKeys(field v3.AttributeKey) []string {
|
||||
names := []string{}
|
||||
if field.Type != v3.AttributeKeyTypeUnspecified && field.DataType != v3.AttributeKeyDataTypeUnspecified {
|
||||
names = append(names, field.Key+"##"+field.Type.String()+"##"+field.DataType.String())
|
||||
|
@ -18,7 +18,7 @@ func TestListTsRange(t *testing.T) {
|
||||
name: "testing for less then one hour",
|
||||
start: 1722262800000000000, // July 29, 2024 7:50:00 PM
|
||||
end: 1722263800000000000, // July 29, 2024 8:06:40 PM
|
||||
res: []LogsListTsRange{},
|
||||
res: []LogsListTsRange{{1722262800000000000, 1722263800000000000}},
|
||||
},
|
||||
{
|
||||
name: "testing for more than one hour",
|
||||
@ -53,7 +53,7 @@ func TestListTsRange(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func Test_GenerateLogEnrichmentKeys(t *testing.T) {
|
||||
func Test_GenerateEnrichmentKeys(t *testing.T) {
|
||||
type args struct {
|
||||
field v3.AttributeKey
|
||||
}
|
||||
@ -96,8 +96,8 @@ func Test_GenerateLogEnrichmentKeys(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if got := GenerateLogEnrichmentKeys(tt.args.field); !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("generateLogEnrichmentKeys() = %v, want %v", got, tt.want)
|
||||
if got := GenerateEnrichmentKeys(tt.args.field); !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("GenerateEnrichmentKeys() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user