mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-07-25 08:34:25 +08:00
commit
e186474414
4
.github/workflows/build.yaml
vendored
4
.github/workflows/build.yaml
vendored
@ -33,7 +33,9 @@ jobs:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v3
|
||||
- name: Create .env file
|
||||
run: echo 'INTERCOM_APP_ID="${{ secrets.INTERCOM_APP_ID }}"' > frontend/.env
|
||||
run: |
|
||||
echo 'INTERCOM_APP_ID="${{ secrets.INTERCOM_APP_ID }}"' > frontend/.env
|
||||
echo 'SEGMENT_ID="${{ secrets.SEGMENT_ID }}"' >> frontend/.env
|
||||
- name: Install dependencies
|
||||
run: cd frontend && yarn install
|
||||
- name: Run ESLint
|
||||
|
4
.github/workflows/push.yaml
vendored
4
.github/workflows/push.yaml
vendored
@ -130,7 +130,9 @@ jobs:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v3
|
||||
- name: Create .env file
|
||||
run: echo 'INTERCOM_APP_ID="${{ secrets.INTERCOM_APP_ID }}"' > frontend/.env
|
||||
run: |
|
||||
echo 'INTERCOM_APP_ID="${{ secrets.INTERCOM_APP_ID }}"' > frontend/.env
|
||||
echo 'SEGMENT_ID="${{ secrets.SEGMENT_ID }}"' >> frontend/.env
|
||||
- name: Install dependencies
|
||||
working-directory: frontend
|
||||
run: yarn install
|
||||
|
@ -144,7 +144,7 @@ services:
|
||||
condition: on-failure
|
||||
|
||||
query-service:
|
||||
image: signoz/query-service:0.29.2
|
||||
image: signoz/query-service:0.29.3
|
||||
command:
|
||||
[
|
||||
"-config=/root/config/prometheus.yml",
|
||||
@ -184,7 +184,7 @@ services:
|
||||
<<: *clickhouse-depend
|
||||
|
||||
frontend:
|
||||
image: signoz/frontend:0.29.2
|
||||
image: signoz/frontend:0.29.3
|
||||
deploy:
|
||||
restart_policy:
|
||||
condition: on-failure
|
||||
|
@ -162,7 +162,7 @@ services:
|
||||
# Notes for Maintainers/Contributors who will change Line Numbers of Frontend & Query-Section. Please Update Line Numbers in `./scripts/commentLinesForSetup.sh` & `./CONTRIBUTING.md`
|
||||
|
||||
query-service:
|
||||
image: signoz/query-service:${DOCKER_TAG:-0.29.2}
|
||||
image: signoz/query-service:${DOCKER_TAG:-0.29.3}
|
||||
container_name: signoz-query-service
|
||||
command:
|
||||
[
|
||||
@ -201,7 +201,7 @@ services:
|
||||
<<: *clickhouse-depend
|
||||
|
||||
frontend:
|
||||
image: signoz/frontend:${DOCKER_TAG:-0.29.2}
|
||||
image: signoz/frontend:${DOCKER_TAG:-0.29.3}
|
||||
container_name: signoz-frontend
|
||||
restart: on-failure
|
||||
depends_on:
|
||||
|
@ -24,7 +24,7 @@ COPY . .
|
||||
RUN yarn build
|
||||
|
||||
|
||||
FROM nginx:1.24.0-alpine
|
||||
FROM nginx:1.25.2-alpine
|
||||
|
||||
COPY conf/default.conf /etc/nginx/conf.d/default.conf
|
||||
|
||||
|
@ -76,9 +76,9 @@ function App(): JSX.Element {
|
||||
|
||||
useEffect(() => {
|
||||
if (isLoggedInState && user && user.userId && user.email) {
|
||||
window.analytics.identify(user?.userId, {
|
||||
email: user?.email || '',
|
||||
name: user?.name || '',
|
||||
window.analytics.identify(user?.email, {
|
||||
email: user?.email,
|
||||
name: user?.name,
|
||||
});
|
||||
}
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
|
@ -0,0 +1,20 @@
|
||||
import { Tag } from 'antd';
|
||||
import styled from 'styled-components';
|
||||
|
||||
export const TagContainer = styled(Tag)`
|
||||
&&& {
|
||||
border-radius: 0.25rem;
|
||||
padding: 0.063rem 0.5rem;
|
||||
font-weight: 600;
|
||||
font-size: 0.75rem;
|
||||
line-height: 1.25rem;
|
||||
}
|
||||
`;
|
||||
|
||||
export const TagLabel = styled.span`
|
||||
font-weight: 400;
|
||||
`;
|
||||
|
||||
export const TagValue = styled.span`
|
||||
text-transform: capitalize;
|
||||
`;
|
@ -1,6 +1,6 @@
|
||||
import { blue } from '@ant-design/colors';
|
||||
import { Tag } from 'antd';
|
||||
|
||||
import { TagContainer, TagLabel, TagValue } from './FieldRenderer.styles';
|
||||
import { FieldRendererProps } from './LogDetailedView.types';
|
||||
import { getFieldAttributes } from './utils';
|
||||
|
||||
@ -12,8 +12,14 @@ function FieldRenderer({ field }: FieldRendererProps): JSX.Element {
|
||||
{dataType && newField && logType ? (
|
||||
<>
|
||||
<span style={{ color: blue[4] }}>{newField} </span>
|
||||
<Tag>Type: {logType}</Tag>
|
||||
<Tag>Data type: {dataType}</Tag>
|
||||
<TagContainer>
|
||||
<TagLabel>Type: </TagLabel>
|
||||
<TagValue>{logType}</TagValue>
|
||||
</TagContainer>
|
||||
<TagContainer>
|
||||
<TagLabel>Data type: </TagLabel>
|
||||
<TagValue>{dataType}</TagValue>
|
||||
</TagContainer>
|
||||
</>
|
||||
) : (
|
||||
<span style={{ color: blue[4] }}>{field}</span>
|
||||
|
@ -3,7 +3,8 @@
|
||||
import './APM.styles.scss';
|
||||
|
||||
import cx from 'classnames';
|
||||
import { useState } from 'react';
|
||||
import { useEffect, useState } from 'react';
|
||||
import { trackEvent } from 'utils/segmentAnalytics';
|
||||
|
||||
import GoLang from './GoLang/GoLang';
|
||||
import Java from './Java/Java';
|
||||
@ -36,6 +37,15 @@ export default function APM({
|
||||
}): JSX.Element {
|
||||
const [selectedLanguage, setSelectedLanguage] = useState('java');
|
||||
|
||||
useEffect(() => {
|
||||
// on language select
|
||||
trackEvent('Onboarding: APM', {
|
||||
selectedLanguage,
|
||||
activeStep,
|
||||
});
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [selectedLanguage]);
|
||||
|
||||
const renderSelectedLanguageSetupInstructions = (): JSX.Element => {
|
||||
switch (selectedLanguage) {
|
||||
case 'java':
|
||||
|
@ -3,7 +3,8 @@ import './Java.styles.scss';
|
||||
import { MDXProvider } from '@mdx-js/react';
|
||||
import { Form, Input, Select } from 'antd';
|
||||
import Header from 'container/OnboardingContainer/common/Header/Header';
|
||||
import { useState } from 'react';
|
||||
import { useEffect, useState } from 'react';
|
||||
import { trackEvent } from 'utils/segmentAnalytics';
|
||||
|
||||
import ConnectionStatus from '../common/ConnectionStatus/ConnectionStatus';
|
||||
import JavaDocs from './md-docs/java.md';
|
||||
@ -27,6 +28,14 @@ export default function Java({
|
||||
|
||||
const [form] = Form.useForm();
|
||||
|
||||
useEffect(() => {
|
||||
// on language select
|
||||
trackEvent('Onboarding: APM : Java', {
|
||||
selectedFrameWork,
|
||||
});
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [selectedFrameWork]);
|
||||
|
||||
const renderDocs = (): JSX.Element => {
|
||||
switch (selectedFrameWork) {
|
||||
case 'tomcat':
|
||||
|
@ -3,7 +3,8 @@ import './Javascript.styles.scss';
|
||||
import { MDXProvider } from '@mdx-js/react';
|
||||
import { Form, Input, Select } from 'antd';
|
||||
import Header from 'container/OnboardingContainer/common/Header/Header';
|
||||
import { useState } from 'react';
|
||||
import { useEffect, useState } from 'react';
|
||||
import { trackEvent } from 'utils/segmentAnalytics';
|
||||
|
||||
import ConnectionStatus from '../common/ConnectionStatus/ConnectionStatus';
|
||||
import ExpressDocs from './md-docs/express.md';
|
||||
@ -25,6 +26,14 @@ export default function Javascript({
|
||||
|
||||
const [form] = Form.useForm();
|
||||
|
||||
useEffect(() => {
|
||||
// on language select
|
||||
trackEvent('Onboarding: APM : Javascript', {
|
||||
selectedFrameWork,
|
||||
});
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [selectedFrameWork]);
|
||||
|
||||
const renderDocs = (): JSX.Element => {
|
||||
switch (selectedFrameWork) {
|
||||
case 'nodejs':
|
||||
|
@ -3,7 +3,8 @@ import './Python.styles.scss';
|
||||
import { MDXProvider } from '@mdx-js/react';
|
||||
import { Form, Input, Select } from 'antd';
|
||||
import Header from 'container/OnboardingContainer/common/Header/Header';
|
||||
import { useState } from 'react';
|
||||
import { useEffect, useState } from 'react';
|
||||
import { trackEvent } from 'utils/segmentAnalytics';
|
||||
|
||||
import ConnectionStatus from '../common/ConnectionStatus/ConnectionStatus';
|
||||
import DjangoDocs from './md-docs/django.md';
|
||||
@ -29,6 +30,14 @@ export default function Python({
|
||||
|
||||
const [form] = Form.useForm();
|
||||
|
||||
useEffect(() => {
|
||||
// on language select
|
||||
trackEvent('Onboarding: APM : Python', {
|
||||
selectedFrameWork,
|
||||
});
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [selectedFrameWork]);
|
||||
|
||||
const renderDocs = (): JSX.Element => {
|
||||
switch (selectedFrameWork) {
|
||||
case 'django':
|
||||
|
@ -16,6 +16,7 @@ import { UPDATE_TIME_INTERVAL } from 'types/actions/globalTime';
|
||||
import { PayloadProps as QueryServicePayloadProps } from 'types/api/metrics/getService';
|
||||
import { GlobalReducer } from 'types/reducer/globalTime';
|
||||
import { Tags } from 'types/reducer/trace';
|
||||
import { trackEvent } from 'utils/segmentAnalytics';
|
||||
|
||||
interface ConnectionStatusProps {
|
||||
serviceName: string;
|
||||
@ -112,6 +113,10 @@ export default function ConnectionStatus({
|
||||
if (data || isError) {
|
||||
setRetryCount(retryCount - 1);
|
||||
if (retryCount < 0) {
|
||||
trackEvent('❌ Onboarding: APM: Connection Status', {
|
||||
serviceName,
|
||||
status: 'Failed',
|
||||
});
|
||||
setLoading(false);
|
||||
}
|
||||
}
|
||||
@ -122,6 +127,11 @@ export default function ConnectionStatus({
|
||||
setLoading(false);
|
||||
setIsReceivingData(true);
|
||||
|
||||
trackEvent('✅ Onboarding: APM: Connection Status', {
|
||||
serviceName,
|
||||
status: 'Successful',
|
||||
});
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -130,31 +140,35 @@ export default function ConnectionStatus({
|
||||
|
||||
// Use useEffect to update query parameters when the polling interval lapses
|
||||
useEffect(() => {
|
||||
const pollingTimer = setInterval(() => {
|
||||
// Trigger a refetch with the updated parameters
|
||||
const updatedMinTime = (Date.now() - 15 * 60 * 1000) * 1000000;
|
||||
const updatedMaxTime = Date.now() * 1000000;
|
||||
let pollingTimer: string | number | NodeJS.Timer | undefined;
|
||||
|
||||
const payload = {
|
||||
maxTime: updatedMaxTime,
|
||||
minTime: updatedMinTime,
|
||||
selectedTime,
|
||||
};
|
||||
if (loading) {
|
||||
pollingTimer = setInterval(() => {
|
||||
// Trigger a refetch with the updated parameters
|
||||
const updatedMinTime = (Date.now() - 15 * 60 * 1000) * 1000000;
|
||||
const updatedMaxTime = Date.now() * 1000000;
|
||||
|
||||
dispatch({
|
||||
type: UPDATE_TIME_INTERVAL,
|
||||
payload,
|
||||
});
|
||||
const payload = {
|
||||
maxTime: updatedMaxTime,
|
||||
minTime: updatedMinTime,
|
||||
selectedTime,
|
||||
};
|
||||
|
||||
// refetch(updatedParams);
|
||||
}, pollingInterval); // Same interval as pollingInterval
|
||||
dispatch({
|
||||
type: UPDATE_TIME_INTERVAL,
|
||||
payload,
|
||||
});
|
||||
}, pollingInterval); // Same interval as pollingInterval
|
||||
} else if (!loading && pollingTimer) {
|
||||
clearInterval(pollingTimer);
|
||||
}
|
||||
|
||||
// Clean up the interval when the component unmounts
|
||||
return (): void => {
|
||||
clearInterval(pollingTimer);
|
||||
};
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [refetch, selectedTags, selectedTime]);
|
||||
}, [refetch, selectedTags, selectedTime, loading]);
|
||||
|
||||
useEffect(() => {
|
||||
verifyApplicationData(data);
|
||||
|
@ -1,7 +1,8 @@
|
||||
import { MDXProvider } from '@mdx-js/react';
|
||||
import { Select } from 'antd';
|
||||
import Header from 'container/OnboardingContainer/common/Header/Header';
|
||||
import { useState } from 'react';
|
||||
import { useEffect, useState } from 'react';
|
||||
import { trackEvent } from 'utils/segmentAnalytics';
|
||||
|
||||
import FluentBit from './md-docs/fluentBit.md';
|
||||
import FluentD from './md-docs/fluentD.md';
|
||||
@ -16,6 +17,14 @@ enum FrameworksMap {
|
||||
export default function ExistingCollectors(): JSX.Element {
|
||||
const [selectedFrameWork, setSelectedFrameWork] = useState('fluent_d');
|
||||
|
||||
useEffect(() => {
|
||||
// on language select
|
||||
trackEvent('Onboarding: Logs Management: Existing Collectors', {
|
||||
selectedFrameWork,
|
||||
});
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [selectedFrameWork]);
|
||||
|
||||
const renderDocs = (): JSX.Element => {
|
||||
switch (selectedFrameWork) {
|
||||
case 'fluent_d':
|
||||
|
@ -4,7 +4,8 @@
|
||||
import './LogsManagement.styles.scss';
|
||||
|
||||
import cx from 'classnames';
|
||||
import { useState } from 'react';
|
||||
import { useEffect, useState } from 'react';
|
||||
import { trackEvent } from 'utils/segmentAnalytics';
|
||||
|
||||
import ApplicationLogs from './ApplicationLogs/ApplicationLogs';
|
||||
import Docker from './Docker/Docker';
|
||||
@ -60,6 +61,15 @@ export default function LogsManagement({
|
||||
}): JSX.Element {
|
||||
const [selectedLogsType, setSelectedLogsType] = useState('kubernetes');
|
||||
|
||||
useEffect(() => {
|
||||
// on language select
|
||||
trackEvent('Onboarding: Logs Management', {
|
||||
selectedLogsType,
|
||||
activeStep,
|
||||
});
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [selectedLogsType]);
|
||||
|
||||
const renderSelectedLanguageSetupInstructions = ():
|
||||
| JSX.Element
|
||||
| undefined => {
|
||||
|
@ -16,6 +16,7 @@ import { DataTypes } from 'types/api/queryBuilder/queryAutocompleteResponse';
|
||||
import { Query } from 'types/api/queryBuilder/queryBuilderData';
|
||||
import { EQueryType } from 'types/common/dashboard';
|
||||
import { DataSource } from 'types/common/queryBuilder';
|
||||
import { trackEvent } from 'utils/segmentAnalytics';
|
||||
|
||||
interface ConnectionStatusProps {
|
||||
logType: string;
|
||||
@ -95,6 +96,10 @@ export default function LogsConnectionStatus({
|
||||
setRetryCount(retryCount - 1);
|
||||
|
||||
if (retryCount < 0) {
|
||||
trackEvent('❌ Onboarding: Logs Management: Connection Status', {
|
||||
status: 'Failed',
|
||||
});
|
||||
|
||||
setLoading(false);
|
||||
setPollingInterval(false);
|
||||
}
|
||||
@ -123,6 +128,11 @@ export default function LogsConnectionStatus({
|
||||
setIsReceivingData(true);
|
||||
setRetryCount(-1);
|
||||
setPollingInterval(false);
|
||||
|
||||
trackEvent('✅ Onboarding: Logs Management: Connection Status', {
|
||||
status: 'Successful',
|
||||
});
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -9,6 +9,8 @@ import ROUTES from 'constants/routes';
|
||||
import { useIsDarkMode } from 'hooks/useDarkMode';
|
||||
import history from 'lib/history';
|
||||
import { useEffect, useState } from 'react';
|
||||
import { useEffectOnce } from 'react-use';
|
||||
import { trackEvent } from 'utils/segmentAnalytics';
|
||||
|
||||
import APM from './APM/APM';
|
||||
import InfrastructureMonitoring from './InfrastructureMonitoring/InfrastructureMonitoring';
|
||||
@ -98,6 +100,10 @@ export default function Onboarding(): JSX.Element {
|
||||
},
|
||||
];
|
||||
|
||||
useEffectOnce(() => {
|
||||
trackEvent('Onboarding Started');
|
||||
});
|
||||
|
||||
useEffect(() => {
|
||||
if (selectedModule?.id === ModulesMap.InfrastructureMonitoring) {
|
||||
setsteps([...baseSteps]);
|
||||
@ -123,27 +129,55 @@ export default function Onboarding(): JSX.Element {
|
||||
},
|
||||
]);
|
||||
}
|
||||
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [selectedModule, selectedLogsType]);
|
||||
|
||||
useEffect(() => {
|
||||
// on select
|
||||
trackEvent('Onboarding: Module Selected', {
|
||||
selectedModule: selectedModule.id,
|
||||
});
|
||||
}, [selectedModule]);
|
||||
|
||||
const handleNext = (): void => {
|
||||
// Need to add logic to validate service name and then allow next step transition in APM module
|
||||
const isFormValid = true;
|
||||
|
||||
if (isFormValid && activeStep <= 3) {
|
||||
setActiveStep(activeStep + 1);
|
||||
const nextStep = activeStep + 1;
|
||||
|
||||
// on next
|
||||
trackEvent('Onboarding: Next', {
|
||||
selectedModule: selectedModule.id,
|
||||
nextStepId: nextStep,
|
||||
});
|
||||
|
||||
setActiveStep(nextStep);
|
||||
setCurrent(current + 1);
|
||||
}
|
||||
};
|
||||
|
||||
const handlePrev = (): void => {
|
||||
if (activeStep >= 1) {
|
||||
const prevStep = activeStep - 1;
|
||||
|
||||
// on prev
|
||||
trackEvent('Onboarding: Back', {
|
||||
module: selectedModule.id,
|
||||
prevStepId: prevStep,
|
||||
});
|
||||
|
||||
setCurrent(current - 1);
|
||||
setActiveStep(activeStep - 1);
|
||||
setActiveStep(prevStep);
|
||||
}
|
||||
};
|
||||
|
||||
const handleOnboardingComplete = (): void => {
|
||||
trackEvent('Onboarding Complete', {
|
||||
module: selectedModule.id,
|
||||
});
|
||||
|
||||
switch (selectedModule.id) {
|
||||
case ModulesMap.APM:
|
||||
history.push(ROUTES.APPLICATION);
|
||||
@ -160,8 +194,15 @@ export default function Onboarding(): JSX.Element {
|
||||
};
|
||||
|
||||
const handleStepChange = (value: number): void => {
|
||||
const stepId = value + 1;
|
||||
|
||||
trackEvent('Onboarding: Step Change', {
|
||||
module: selectedModule.id,
|
||||
step: stepId,
|
||||
});
|
||||
|
||||
setCurrent(value);
|
||||
setActiveStep(value + 1);
|
||||
setActiveStep(stepId);
|
||||
};
|
||||
|
||||
const handleModuleSelect = (module: ModuleProps): void => {
|
||||
|
@ -4,7 +4,7 @@ function trackPageView(pageName: string): void {
|
||||
|
||||
function trackEvent(
|
||||
eventName: string,
|
||||
properties: Record<string, string>,
|
||||
properties?: Record<string, unknown>,
|
||||
): void {
|
||||
window.analytics.track(eventName, properties);
|
||||
}
|
||||
|
@ -21,7 +21,11 @@ const sassLoader = 'sass-loader';
|
||||
const styleLoader = 'style-loader';
|
||||
|
||||
const plugins = [
|
||||
new HtmlWebpackPlugin({ template: 'src/index.html.ejs' }),
|
||||
new HtmlWebpackPlugin({
|
||||
template: 'src/index.html.ejs',
|
||||
INTERCOM_APP_ID: process.env.INTERCOM_APP_ID,
|
||||
SEGMENT_ID: process.env.SEGMENT_ID,
|
||||
}),
|
||||
new CompressionPlugin({
|
||||
exclude: /.map$/,
|
||||
}),
|
||||
@ -35,6 +39,7 @@ const plugins = [
|
||||
'process.env': JSON.stringify({
|
||||
FRONTEND_API_ENDPOINT: process.env.FRONTEND_API_ENDPOINT,
|
||||
INTERCOM_APP_ID: process.env.INTERCOM_APP_ID,
|
||||
SEGMENT_ID: process.env.SEGMENT_ID,
|
||||
}),
|
||||
}),
|
||||
new MiniCssExtractPlugin(),
|
||||
|
@ -28,7 +28,7 @@ func NewLogParsingPipelinesController(db *sqlx.DB, engine string) (*LogParsingPi
|
||||
type PipelinesResponse struct {
|
||||
*agentConf.ConfigVersion
|
||||
|
||||
Pipelines []model.Pipeline `json:"pipelines"`
|
||||
Pipelines []Pipeline `json:"pipelines"`
|
||||
History []agentConf.ConfigVersion `json:"history"`
|
||||
}
|
||||
|
||||
@ -43,7 +43,7 @@ func (ic *LogParsingPipelineController) ApplyPipelines(
|
||||
return nil, model.UnauthorizedError(errors.Wrap(authErr, "failed to get userId from context"))
|
||||
}
|
||||
|
||||
var pipelines []model.Pipeline
|
||||
var pipelines []Pipeline
|
||||
|
||||
// scan through postable pipelines, to select the existing pipelines or insert missing ones
|
||||
for _, r := range postable {
|
||||
|
@ -41,7 +41,7 @@ func (r *Repo) InitDB(engine string) error {
|
||||
// insertPipeline stores a given postable pipeline to database
|
||||
func (r *Repo) insertPipeline(
|
||||
ctx context.Context, postable *PostablePipeline,
|
||||
) (*model.Pipeline, *model.ApiError) {
|
||||
) (*Pipeline, *model.ApiError) {
|
||||
if err := postable.IsValid(); err != nil {
|
||||
return nil, model.BadRequest(errors.Wrap(err,
|
||||
"pipeline is not valid",
|
||||
@ -65,7 +65,7 @@ func (r *Repo) insertPipeline(
|
||||
return nil, model.UnauthorizedError(err)
|
||||
}
|
||||
|
||||
insertRow := &model.Pipeline{
|
||||
insertRow := &Pipeline{
|
||||
Id: uuid.New().String(),
|
||||
OrderId: postable.OrderId,
|
||||
Enabled: postable.Enabled,
|
||||
@ -75,7 +75,7 @@ func (r *Repo) insertPipeline(
|
||||
Filter: postable.Filter,
|
||||
Config: postable.Config,
|
||||
RawConfig: string(rawConfig),
|
||||
Creator: model.Creator{
|
||||
Creator: Creator{
|
||||
CreatedBy: claims["email"].(string),
|
||||
CreatedAt: time.Now(),
|
||||
},
|
||||
@ -107,9 +107,11 @@ func (r *Repo) insertPipeline(
|
||||
}
|
||||
|
||||
// getPipelinesByVersion returns pipelines associated with a given version
|
||||
func (r *Repo) getPipelinesByVersion(ctx context.Context, version int) ([]model.Pipeline, []error) {
|
||||
func (r *Repo) getPipelinesByVersion(
|
||||
ctx context.Context, version int,
|
||||
) ([]Pipeline, []error) {
|
||||
var errors []error
|
||||
pipelines := []model.Pipeline{}
|
||||
pipelines := []Pipeline{}
|
||||
|
||||
versionQuery := `SELECT r.id,
|
||||
r.name,
|
||||
@ -151,8 +153,8 @@ func (r *Repo) getPipelinesByVersion(ctx context.Context, version int) ([]model.
|
||||
// GetPipelines returns pipeline and errors (if any)
|
||||
func (r *Repo) GetPipeline(
|
||||
ctx context.Context, id string,
|
||||
) (*model.Pipeline, *model.ApiError) {
|
||||
pipelines := []model.Pipeline{}
|
||||
) (*Pipeline, *model.ApiError) {
|
||||
pipelines := []Pipeline{}
|
||||
|
||||
pipelineQuery := `SELECT id,
|
||||
name,
|
||||
|
@ -1,21 +1,22 @@
|
||||
package model
|
||||
package logparsingpipeline
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
)
|
||||
|
||||
// Pipeline is stored and also deployed finally to collector config
|
||||
type Pipeline struct {
|
||||
Id string `json:"id,omitempty" db:"id"`
|
||||
OrderId int `json:"orderId" db:"order_id"`
|
||||
Name string `json:"name,omitempty" db:"name"`
|
||||
Alias string `json:"alias" db:"alias"`
|
||||
Description *string `json:"description" db:"description"`
|
||||
Enabled bool `json:"enabled" db:"enabled"`
|
||||
Filter string `json:"filter" db:"filter"`
|
||||
Id string `json:"id,omitempty" db:"id"`
|
||||
OrderId int `json:"orderId" db:"order_id"`
|
||||
Name string `json:"name,omitempty" db:"name"`
|
||||
Alias string `json:"alias" db:"alias"`
|
||||
Description *string `json:"description" db:"description"`
|
||||
Enabled bool `json:"enabled" db:"enabled"`
|
||||
Filter *v3.FilterSet `json:"filter" db:"filter"`
|
||||
|
||||
// configuration for pipeline
|
||||
RawConfig string `db:"config_json" json:"-"`
|
@ -1,15 +1,20 @@
|
||||
package logparsingpipeline
|
||||
|
||||
import (
|
||||
"github.com/pkg/errors"
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
"go.signoz.io/signoz/pkg/query-service/queryBuilderToExpr"
|
||||
)
|
||||
|
||||
const (
|
||||
NOOP = "noop"
|
||||
)
|
||||
|
||||
func PreparePipelineProcessor(pipelines []model.Pipeline) (map[string]interface{}, []string, error) {
|
||||
func CollectorConfProcessorName(p Pipeline) string {
|
||||
return constants.LogsPPLPfx + p.Alias
|
||||
}
|
||||
|
||||
func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []string, error) {
|
||||
processors := map[string]interface{}{}
|
||||
names := []string{}
|
||||
for _, v := range pipelines {
|
||||
@ -21,14 +26,20 @@ func PreparePipelineProcessor(pipelines []model.Pipeline) (map[string]interface{
|
||||
if len(operators) == 0 {
|
||||
continue
|
||||
}
|
||||
router := []model.PipelineOperator{
|
||||
|
||||
filterExpr, err := queryBuilderToExpr.Parse(v.Filter)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrap(err, "failed to parse pipeline filter")
|
||||
}
|
||||
|
||||
router := []PipelineOperator{
|
||||
{
|
||||
ID: "router_signoz",
|
||||
Type: "router",
|
||||
Routes: &[]model.Route{
|
||||
Routes: &[]Route{
|
||||
{
|
||||
Output: v.Config[0].ID,
|
||||
Expr: v.Filter,
|
||||
Expr: filterExpr,
|
||||
},
|
||||
},
|
||||
Default: NOOP,
|
||||
@ -38,24 +49,24 @@ func PreparePipelineProcessor(pipelines []model.Pipeline) (map[string]interface{
|
||||
v.Config = append(router, operators...)
|
||||
|
||||
// noop operator is needed as the default operator so that logs are not dropped
|
||||
noop := model.PipelineOperator{
|
||||
noop := PipelineOperator{
|
||||
ID: NOOP,
|
||||
Type: NOOP,
|
||||
}
|
||||
v.Config = append(v.Config, noop)
|
||||
|
||||
processor := model.Processor{
|
||||
processor := Processor{
|
||||
Operators: v.Config,
|
||||
}
|
||||
name := constants.LogsPPLPfx + v.Alias
|
||||
name := CollectorConfProcessorName(v)
|
||||
processors[name] = processor
|
||||
names = append(names, name)
|
||||
}
|
||||
return processors, names, nil
|
||||
}
|
||||
|
||||
func getOperators(ops []model.PipelineOperator) []model.PipelineOperator {
|
||||
filteredOp := []model.PipelineOperator{}
|
||||
func getOperators(ops []PipelineOperator) []PipelineOperator {
|
||||
filteredOp := []PipelineOperator{}
|
||||
for i, operator := range ops {
|
||||
if operator.Enabled {
|
||||
if len(filteredOp) > 0 {
|
||||
|
@ -4,17 +4,16 @@ import (
|
||||
"testing"
|
||||
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
)
|
||||
|
||||
var prepareProcessorTestData = []struct {
|
||||
Name string
|
||||
Operators []model.PipelineOperator
|
||||
Output []model.PipelineOperator
|
||||
Operators []PipelineOperator
|
||||
Output []PipelineOperator
|
||||
}{
|
||||
{
|
||||
Name: "Last operator disabled",
|
||||
Operators: []model.PipelineOperator{
|
||||
Operators: []PipelineOperator{
|
||||
{
|
||||
ID: "t1",
|
||||
Name: "t1",
|
||||
@ -27,7 +26,7 @@ var prepareProcessorTestData = []struct {
|
||||
Enabled: false,
|
||||
},
|
||||
},
|
||||
Output: []model.PipelineOperator{
|
||||
Output: []PipelineOperator{
|
||||
{
|
||||
ID: "t1",
|
||||
Name: "t1",
|
||||
@ -37,7 +36,7 @@ var prepareProcessorTestData = []struct {
|
||||
},
|
||||
{
|
||||
Name: "Operator in middle disabled",
|
||||
Operators: []model.PipelineOperator{
|
||||
Operators: []PipelineOperator{
|
||||
{
|
||||
ID: "t1",
|
||||
Name: "t1",
|
||||
@ -56,7 +55,7 @@ var prepareProcessorTestData = []struct {
|
||||
Enabled: true,
|
||||
},
|
||||
},
|
||||
Output: []model.PipelineOperator{
|
||||
Output: []PipelineOperator{
|
||||
{
|
||||
ID: "t1",
|
||||
Name: "t1",
|
||||
@ -72,7 +71,7 @@ var prepareProcessorTestData = []struct {
|
||||
},
|
||||
{
|
||||
Name: "Single operator disabled",
|
||||
Operators: []model.PipelineOperator{
|
||||
Operators: []PipelineOperator{
|
||||
{
|
||||
ID: "t1",
|
||||
Name: "t1",
|
||||
@ -80,18 +79,18 @@ var prepareProcessorTestData = []struct {
|
||||
Enabled: false,
|
||||
},
|
||||
},
|
||||
Output: []model.PipelineOperator{},
|
||||
Output: []PipelineOperator{},
|
||||
},
|
||||
{
|
||||
Name: "Single operator enabled",
|
||||
Operators: []model.PipelineOperator{
|
||||
Operators: []PipelineOperator{
|
||||
{
|
||||
ID: "t1",
|
||||
Name: "t1",
|
||||
Enabled: true,
|
||||
},
|
||||
},
|
||||
Output: []model.PipelineOperator{
|
||||
Output: []PipelineOperator{
|
||||
{
|
||||
ID: "t1",
|
||||
Name: "t1",
|
||||
@ -101,12 +100,12 @@ var prepareProcessorTestData = []struct {
|
||||
},
|
||||
{
|
||||
Name: "Empty operator",
|
||||
Operators: []model.PipelineOperator{},
|
||||
Output: []model.PipelineOperator{},
|
||||
Operators: []PipelineOperator{},
|
||||
Output: []PipelineOperator{},
|
||||
},
|
||||
{
|
||||
Name: "new test",
|
||||
Operators: []model.PipelineOperator{
|
||||
Operators: []PipelineOperator{
|
||||
{
|
||||
ID: "move_filename",
|
||||
Output: "move_function",
|
||||
@ -137,7 +136,7 @@ var prepareProcessorTestData = []struct {
|
||||
Name: "move_lwp",
|
||||
},
|
||||
},
|
||||
Output: []model.PipelineOperator{
|
||||
Output: []PipelineOperator{
|
||||
{
|
||||
ID: "move_filename",
|
||||
Output: "move_line",
|
||||
@ -165,7 +164,7 @@ var prepareProcessorTestData = []struct {
|
||||
},
|
||||
{
|
||||
Name: "first op disabled",
|
||||
Operators: []model.PipelineOperator{
|
||||
Operators: []PipelineOperator{
|
||||
{
|
||||
ID: "move_filename",
|
||||
Output: "move_function",
|
||||
@ -178,7 +177,7 @@ var prepareProcessorTestData = []struct {
|
||||
Name: "move_function",
|
||||
},
|
||||
},
|
||||
Output: []model.PipelineOperator{
|
||||
Output: []PipelineOperator{
|
||||
{
|
||||
ID: "move_function",
|
||||
Enabled: true,
|
||||
|
@ -6,9 +6,8 @@ import (
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/antonmedv/expr"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/queryBuilderToExpr"
|
||||
)
|
||||
|
||||
// PostablePipelines are a list of user defined pielines
|
||||
@ -19,14 +18,14 @@ type PostablePipelines struct {
|
||||
// PostablePipeline captures user inputs in setting the pipeline
|
||||
|
||||
type PostablePipeline struct {
|
||||
Id string `json:"id"`
|
||||
OrderId int `json:"orderId"`
|
||||
Name string `json:"name"`
|
||||
Alias string `json:"alias"`
|
||||
Description string `json:"description"`
|
||||
Enabled bool `json:"enabled"`
|
||||
Filter string `json:"filter"`
|
||||
Config []model.PipelineOperator `json:"config"`
|
||||
Id string `json:"id"`
|
||||
OrderId int `json:"orderId"`
|
||||
Name string `json:"name"`
|
||||
Alias string `json:"alias"`
|
||||
Description string `json:"description"`
|
||||
Enabled bool `json:"enabled"`
|
||||
Filter *v3.FilterSet `json:"filter"`
|
||||
Config []PipelineOperator `json:"config"`
|
||||
}
|
||||
|
||||
// IsValid checks if postable pipeline has all the required params
|
||||
@ -42,12 +41,8 @@ func (p *PostablePipeline) IsValid() error {
|
||||
return fmt.Errorf("pipeline alias is required")
|
||||
}
|
||||
|
||||
if p.Filter == "" {
|
||||
return fmt.Errorf("pipeline filter is required")
|
||||
}
|
||||
|
||||
// check the expression
|
||||
_, err := expr.Compile(p.Filter, expr.AsBool(), expr.AllowUndefinedVariables())
|
||||
// check the filter
|
||||
_, err := queryBuilderToExpr.Parse(p.Filter)
|
||||
if err != nil {
|
||||
return fmt.Errorf(fmt.Sprintf("filter for pipeline %v is not correct: %v", p.Name, err.Error()))
|
||||
}
|
||||
@ -95,7 +90,7 @@ func (p *PostablePipeline) IsValid() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func isValidOperator(op model.PipelineOperator) error {
|
||||
func isValidOperator(op PipelineOperator) error {
|
||||
if op.ID == "" {
|
||||
return errors.New("PipelineOperator.ID is required.")
|
||||
}
|
||||
|
@ -4,74 +4,102 @@ import (
|
||||
"testing"
|
||||
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
)
|
||||
|
||||
var correctQueriesTest = []struct {
|
||||
Name string
|
||||
Pipeline PostablePipeline
|
||||
IsValid bool
|
||||
}{
|
||||
{
|
||||
Name: "No orderId",
|
||||
Pipeline: PostablePipeline{
|
||||
Name: "pipeline 1",
|
||||
Alias: "pipeline1",
|
||||
Enabled: true,
|
||||
Filter: "attributes.method == \"GET\"",
|
||||
Config: []model.PipelineOperator{},
|
||||
},
|
||||
IsValid: false,
|
||||
},
|
||||
{
|
||||
Name: "Invalid orderId",
|
||||
Pipeline: PostablePipeline{
|
||||
OrderId: 0,
|
||||
Name: "pipeline 1",
|
||||
Alias: "pipeline1",
|
||||
Enabled: true,
|
||||
Filter: "attributes.method == \"GET\"",
|
||||
Config: []model.PipelineOperator{},
|
||||
},
|
||||
IsValid: false,
|
||||
},
|
||||
{
|
||||
Name: "Valid orderId",
|
||||
Pipeline: PostablePipeline{
|
||||
OrderId: 1,
|
||||
Name: "pipeline 1",
|
||||
Alias: "pipeline1",
|
||||
Enabled: true,
|
||||
Filter: "attributes.method == \"GET\"",
|
||||
Config: []model.PipelineOperator{},
|
||||
},
|
||||
IsValid: true,
|
||||
},
|
||||
{
|
||||
Name: "Invalid filter",
|
||||
Pipeline: PostablePipeline{
|
||||
OrderId: 1,
|
||||
Name: "pipeline 1",
|
||||
Alias: "pipeline1",
|
||||
Enabled: true,
|
||||
Filter: "test filter",
|
||||
},
|
||||
IsValid: false,
|
||||
},
|
||||
{
|
||||
Name: "Valid filter",
|
||||
Pipeline: PostablePipeline{
|
||||
OrderId: 1,
|
||||
Name: "pipeline 1",
|
||||
Alias: "pipeline1",
|
||||
Enabled: true,
|
||||
Filter: "attributes.method == \"GET\"",
|
||||
},
|
||||
IsValid: true,
|
||||
},
|
||||
}
|
||||
|
||||
func TestIsValidPostablePipeline(t *testing.T) {
|
||||
validPipelineFilterSet := &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "method",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeTag,
|
||||
},
|
||||
Operator: "=",
|
||||
Value: "GET",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var correctQueriesTest = []struct {
|
||||
Name string
|
||||
Pipeline PostablePipeline
|
||||
IsValid bool
|
||||
}{
|
||||
{
|
||||
Name: "No orderId",
|
||||
Pipeline: PostablePipeline{
|
||||
Name: "pipeline 1",
|
||||
Alias: "pipeline1",
|
||||
Enabled: true,
|
||||
Filter: validPipelineFilterSet,
|
||||
Config: []PipelineOperator{},
|
||||
},
|
||||
IsValid: false,
|
||||
},
|
||||
{
|
||||
Name: "Invalid orderId",
|
||||
Pipeline: PostablePipeline{
|
||||
OrderId: 0,
|
||||
Name: "pipeline 1",
|
||||
Alias: "pipeline1",
|
||||
Enabled: true,
|
||||
Filter: validPipelineFilterSet,
|
||||
Config: []PipelineOperator{},
|
||||
},
|
||||
IsValid: false,
|
||||
},
|
||||
{
|
||||
Name: "Valid orderId",
|
||||
Pipeline: PostablePipeline{
|
||||
OrderId: 1,
|
||||
Name: "pipeline 1",
|
||||
Alias: "pipeline1",
|
||||
Enabled: true,
|
||||
Filter: validPipelineFilterSet,
|
||||
Config: []PipelineOperator{},
|
||||
},
|
||||
IsValid: true,
|
||||
},
|
||||
{
|
||||
Name: "Invalid filter",
|
||||
Pipeline: PostablePipeline{
|
||||
OrderId: 1,
|
||||
Name: "pipeline 1",
|
||||
Alias: "pipeline1",
|
||||
Enabled: true,
|
||||
Filter: &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "method",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeUnspecified,
|
||||
},
|
||||
Operator: "regex",
|
||||
Value: "[0-9A-Z*",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
IsValid: false,
|
||||
},
|
||||
{
|
||||
Name: "Valid filter",
|
||||
Pipeline: PostablePipeline{
|
||||
OrderId: 1,
|
||||
Name: "pipeline 1",
|
||||
Alias: "pipeline1",
|
||||
Enabled: true,
|
||||
Filter: validPipelineFilterSet,
|
||||
},
|
||||
IsValid: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range correctQueriesTest {
|
||||
Convey(test.Name, t, func() {
|
||||
err := test.Pipeline.IsValid()
|
||||
@ -86,12 +114,12 @@ func TestIsValidPostablePipeline(t *testing.T) {
|
||||
|
||||
var operatorTest = []struct {
|
||||
Name string
|
||||
Operator model.PipelineOperator
|
||||
Operator PipelineOperator
|
||||
IsValid bool
|
||||
}{
|
||||
{
|
||||
Name: "Operator - without id",
|
||||
Operator: model.PipelineOperator{
|
||||
Operator: PipelineOperator{
|
||||
Type: "remove",
|
||||
Field: "attributes.abc",
|
||||
},
|
||||
@ -99,7 +127,7 @@ var operatorTest = []struct {
|
||||
},
|
||||
{
|
||||
Name: "Operator - without type",
|
||||
Operator: model.PipelineOperator{
|
||||
Operator: PipelineOperator{
|
||||
ID: "test",
|
||||
Field: "attributes.abc",
|
||||
},
|
||||
@ -107,7 +135,7 @@ var operatorTest = []struct {
|
||||
},
|
||||
{
|
||||
Name: "Copy - invalid to and from",
|
||||
Operator: model.PipelineOperator{
|
||||
Operator: PipelineOperator{
|
||||
ID: "copy",
|
||||
Type: "copy",
|
||||
From: "date",
|
||||
@ -117,7 +145,7 @@ var operatorTest = []struct {
|
||||
},
|
||||
{
|
||||
Name: "Move - invalid to and from",
|
||||
Operator: model.PipelineOperator{
|
||||
Operator: PipelineOperator{
|
||||
ID: "move",
|
||||
Type: "move",
|
||||
From: "attributes",
|
||||
@ -127,7 +155,7 @@ var operatorTest = []struct {
|
||||
},
|
||||
{
|
||||
Name: "Add - invalid to and from",
|
||||
Operator: model.PipelineOperator{
|
||||
Operator: PipelineOperator{
|
||||
ID: "add",
|
||||
Type: "add",
|
||||
Field: "data",
|
||||
@ -136,7 +164,7 @@ var operatorTest = []struct {
|
||||
},
|
||||
{
|
||||
Name: "Remove - invalid to and from",
|
||||
Operator: model.PipelineOperator{
|
||||
Operator: PipelineOperator{
|
||||
ID: "remove",
|
||||
Type: "remove",
|
||||
Field: "data",
|
||||
@ -145,7 +173,7 @@ var operatorTest = []struct {
|
||||
},
|
||||
{
|
||||
Name: "Add - valid",
|
||||
Operator: model.PipelineOperator{
|
||||
Operator: PipelineOperator{
|
||||
ID: "add",
|
||||
Type: "add",
|
||||
Field: "body",
|
||||
@ -155,7 +183,7 @@ var operatorTest = []struct {
|
||||
},
|
||||
{
|
||||
Name: "Move - valid",
|
||||
Operator: model.PipelineOperator{
|
||||
Operator: PipelineOperator{
|
||||
ID: "move",
|
||||
Type: "move",
|
||||
From: "attributes.x1",
|
||||
@ -165,7 +193,7 @@ var operatorTest = []struct {
|
||||
},
|
||||
{
|
||||
Name: "Copy - valid",
|
||||
Operator: model.PipelineOperator{
|
||||
Operator: PipelineOperator{
|
||||
ID: "copy",
|
||||
Type: "copy",
|
||||
From: "resource.x1",
|
||||
@ -175,7 +203,7 @@ var operatorTest = []struct {
|
||||
},
|
||||
{
|
||||
Name: "Unknown operator",
|
||||
Operator: model.PipelineOperator{
|
||||
Operator: PipelineOperator{
|
||||
ID: "copy",
|
||||
Type: "operator",
|
||||
From: "resource.x1",
|
||||
@ -185,7 +213,7 @@ var operatorTest = []struct {
|
||||
},
|
||||
{
|
||||
Name: "Grok - valid",
|
||||
Operator: model.PipelineOperator{
|
||||
Operator: PipelineOperator{
|
||||
ID: "grok",
|
||||
Type: "grok_parser",
|
||||
Pattern: "%{COMMONAPACHELOG}",
|
||||
@ -195,7 +223,7 @@ var operatorTest = []struct {
|
||||
},
|
||||
{
|
||||
Name: "Grok - invalid",
|
||||
Operator: model.PipelineOperator{
|
||||
Operator: PipelineOperator{
|
||||
ID: "grok",
|
||||
Type: "grok_parser",
|
||||
Pattern: "%{COMMONAPACHELOG}",
|
||||
@ -205,7 +233,7 @@ var operatorTest = []struct {
|
||||
},
|
||||
{
|
||||
Name: "Regex - valid",
|
||||
Operator: model.PipelineOperator{
|
||||
Operator: PipelineOperator{
|
||||
ID: "regex",
|
||||
Type: "regex_parser",
|
||||
Regex: "(?P<time>[^ Z]+) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$",
|
||||
@ -215,7 +243,7 @@ var operatorTest = []struct {
|
||||
},
|
||||
{
|
||||
Name: "Regex - invalid",
|
||||
Operator: model.PipelineOperator{
|
||||
Operator: PipelineOperator{
|
||||
ID: "regex",
|
||||
Type: "regex_parser",
|
||||
Regex: "abcd",
|
||||
|
@ -1,6 +1,7 @@
|
||||
package v3
|
||||
|
||||
import (
|
||||
"database/sql/driver"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sort"
|
||||
@ -8,6 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/pkg/errors"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
)
|
||||
|
||||
@ -529,6 +531,22 @@ func (f *FilterSet) Validate() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// For serializing to and from db
|
||||
func (f *FilterSet) Scan(src interface{}) error {
|
||||
if data, ok := src.([]byte); ok {
|
||||
return json.Unmarshal(data, &f)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *FilterSet) Value() (driver.Value, error) {
|
||||
filterSetJson, err := json.Marshal(f)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not serialize FilterSet to JSON")
|
||||
}
|
||||
return filterSetJson, nil
|
||||
}
|
||||
|
||||
type FilterOperator string
|
||||
|
||||
const (
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"github.com/open-telemetry/opamp-go/protobufs"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.signoz.io/signoz/pkg/query-service/agentConf"
|
||||
"go.signoz.io/signoz/pkg/query-service/app"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
|
||||
@ -28,7 +29,10 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
"go.signoz.io/signoz/pkg/query-service/dao"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/queryBuilderToExpr"
|
||||
"golang.org/x/exp/maps"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
func TestLogPipelinesLifecycle(t *testing.T) {
|
||||
@ -46,6 +50,21 @@ func TestLogPipelinesLifecycle(t *testing.T) {
|
||||
)
|
||||
|
||||
// Should be able to create pipelines config
|
||||
pipelineFilterSet := &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "method",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeTag,
|
||||
},
|
||||
Operator: "=",
|
||||
Value: "GET",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
postablePipelines := logparsingpipeline.PostablePipelines{
|
||||
Pipelines: []logparsingpipeline.PostablePipeline{
|
||||
{
|
||||
@ -53,8 +72,8 @@ func TestLogPipelinesLifecycle(t *testing.T) {
|
||||
Name: "pipeline1",
|
||||
Alias: "pipeline1",
|
||||
Enabled: true,
|
||||
Filter: "attributes.method == \"GET\"",
|
||||
Config: []model.PipelineOperator{
|
||||
Filter: pipelineFilterSet,
|
||||
Config: []logparsingpipeline.PipelineOperator{
|
||||
{
|
||||
OrderId: 1,
|
||||
ID: "add",
|
||||
@ -70,8 +89,8 @@ func TestLogPipelinesLifecycle(t *testing.T) {
|
||||
Name: "pipeline2",
|
||||
Alias: "pipeline2",
|
||||
Enabled: true,
|
||||
Filter: "attributes.method == \"GET\"",
|
||||
Config: []model.PipelineOperator{
|
||||
Filter: pipelineFilterSet,
|
||||
Config: []logparsingpipeline.PipelineOperator{
|
||||
{
|
||||
OrderId: 1,
|
||||
ID: "remove",
|
||||
@ -150,6 +169,21 @@ func TestLogPipelinesLifecycle(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestLogPipelinesValidation(t *testing.T) {
|
||||
validPipelineFilterSet := &v3.FilterSet{
|
||||
Operator: "AND",
|
||||
Items: []v3.FilterItem{
|
||||
{
|
||||
Key: v3.AttributeKey{
|
||||
Key: "method",
|
||||
DataType: v3.AttributeKeyDataTypeString,
|
||||
Type: v3.AttributeKeyTypeTag,
|
||||
},
|
||||
Operator: "=",
|
||||
Value: "GET",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
Name string
|
||||
Pipeline logparsingpipeline.PostablePipeline
|
||||
@ -162,8 +196,8 @@ func TestLogPipelinesValidation(t *testing.T) {
|
||||
Name: "pipeline 1",
|
||||
Alias: "pipeline1",
|
||||
Enabled: true,
|
||||
Filter: "attributes.method == \"GET\"",
|
||||
Config: []model.PipelineOperator{
|
||||
Filter: validPipelineFilterSet,
|
||||
Config: []logparsingpipeline.PipelineOperator{
|
||||
{
|
||||
OrderId: 1,
|
||||
ID: "add",
|
||||
@ -184,8 +218,8 @@ func TestLogPipelinesValidation(t *testing.T) {
|
||||
Name: "pipeline 1",
|
||||
Alias: "pipeline1",
|
||||
Enabled: true,
|
||||
Filter: "attributes.method == \"GET\"",
|
||||
Config: []model.PipelineOperator{
|
||||
Filter: validPipelineFilterSet,
|
||||
Config: []logparsingpipeline.PipelineOperator{
|
||||
{
|
||||
OrderId: 1,
|
||||
ID: "add",
|
||||
@ -206,8 +240,8 @@ func TestLogPipelinesValidation(t *testing.T) {
|
||||
Name: "pipeline 1",
|
||||
Alias: "pipeline1",
|
||||
Enabled: true,
|
||||
Filter: "bad filter",
|
||||
Config: []model.PipelineOperator{
|
||||
Filter: &v3.FilterSet{},
|
||||
Config: []logparsingpipeline.PipelineOperator{
|
||||
{
|
||||
OrderId: 1,
|
||||
ID: "add",
|
||||
@ -228,8 +262,8 @@ func TestLogPipelinesValidation(t *testing.T) {
|
||||
Name: "pipeline 1",
|
||||
Alias: "pipeline1",
|
||||
Enabled: true,
|
||||
Filter: "attributes.method == \"GET\"",
|
||||
Config: []model.PipelineOperator{
|
||||
Filter: validPipelineFilterSet,
|
||||
Config: []logparsingpipeline.PipelineOperator{
|
||||
{
|
||||
OrderId: 1,
|
||||
ID: "add",
|
||||
@ -411,32 +445,32 @@ func (tb *LogPipelinesTestBed) GetPipelinesFromQS() *logparsingpipeline.Pipeline
|
||||
}
|
||||
|
||||
func (tb *LogPipelinesTestBed) assertPipelinesSentToOpampClient(
|
||||
pipelines []model.Pipeline,
|
||||
pipelines []logparsingpipeline.Pipeline,
|
||||
) {
|
||||
lastMsg := tb.opampClientConn.latestMsgFromServer()
|
||||
otelConfigFiles := lastMsg.RemoteConfig.Config.ConfigMap
|
||||
collectorConfigFiles := lastMsg.RemoteConfig.Config.ConfigMap
|
||||
assert.Equal(
|
||||
tb.t, len(otelConfigFiles), 1,
|
||||
tb.t, len(collectorConfigFiles), 1,
|
||||
"otel config sent to client is expected to contain atleast 1 file",
|
||||
)
|
||||
|
||||
otelConfigYaml := maps.Values(otelConfigFiles)[0].Body
|
||||
otelConfSentToClient, err := yaml.Parser().Unmarshal(otelConfigYaml)
|
||||
collectorConfigYaml := maps.Values(collectorConfigFiles)[0].Body
|
||||
collectorConfSentToClient, err := yaml.Parser().Unmarshal(collectorConfigYaml)
|
||||
if err != nil {
|
||||
tb.t.Fatalf("could not unmarshal config file sent to opamp client: %v", err)
|
||||
}
|
||||
|
||||
// Each pipeline is expected to become its own processor
|
||||
// in the logs service in otel collector config.
|
||||
otelConfSvcs := otelConfSentToClient["service"].(map[string]interface{})
|
||||
otelConfLogsSvc := otelConfSvcs["pipelines"].(map[string]interface{})["logs"].(map[string]interface{})
|
||||
otelConfLogsSvcProcessorNames := otelConfLogsSvc["processors"].([]interface{})
|
||||
otelConfLogsPipelineProcNames := []string{}
|
||||
for _, procNameVal := range otelConfLogsSvcProcessorNames {
|
||||
collectorConfSvcs := collectorConfSentToClient["service"].(map[string]interface{})
|
||||
collectorConfLogsSvc := collectorConfSvcs["pipelines"].(map[string]interface{})["logs"].(map[string]interface{})
|
||||
collectorConfLogsSvcProcessorNames := collectorConfLogsSvc["processors"].([]interface{})
|
||||
collectorConfLogsPipelineProcNames := []string{}
|
||||
for _, procNameVal := range collectorConfLogsSvcProcessorNames {
|
||||
procName := procNameVal.(string)
|
||||
if strings.HasPrefix(procName, constants.LogsPPLPfx) {
|
||||
otelConfLogsPipelineProcNames = append(
|
||||
otelConfLogsPipelineProcNames,
|
||||
collectorConfLogsPipelineProcNames = append(
|
||||
collectorConfLogsPipelineProcNames,
|
||||
procName,
|
||||
)
|
||||
}
|
||||
@ -444,16 +478,40 @@ func (tb *LogPipelinesTestBed) assertPipelinesSentToOpampClient(
|
||||
|
||||
_, expectedLogProcessorNames, err := logparsingpipeline.PreparePipelineProcessor(pipelines)
|
||||
assert.Equal(
|
||||
tb.t, expectedLogProcessorNames, otelConfLogsPipelineProcNames,
|
||||
tb.t, expectedLogProcessorNames, collectorConfLogsPipelineProcNames,
|
||||
"config sent to opamp client doesn't contain expected log pipelines",
|
||||
)
|
||||
|
||||
otelConfProcessors := otelConfSentToClient["processors"].(map[string]interface{})
|
||||
collectorConfProcessors := collectorConfSentToClient["processors"].(map[string]interface{})
|
||||
for _, procName := range expectedLogProcessorNames {
|
||||
_, procExists := otelConfProcessors[procName]
|
||||
pipelineProcessorInConf, procExists := collectorConfProcessors[procName]
|
||||
assert.True(tb.t, procExists, fmt.Sprintf(
|
||||
"%s processor not found in config sent to opamp client", procName,
|
||||
))
|
||||
|
||||
// Validate that filter expr in collector conf is as expected.
|
||||
|
||||
// extract expr present in collector conf processor
|
||||
pipelineProcOps := pipelineProcessorInConf.(map[string]interface{})["operators"].([]interface{})
|
||||
|
||||
routerOpIdx := slices.IndexFunc(
|
||||
pipelineProcOps,
|
||||
func(op interface{}) bool { return op.(map[string]interface{})["id"] == "router_signoz" },
|
||||
)
|
||||
require.GreaterOrEqual(tb.t, routerOpIdx, 0)
|
||||
routerOproutes := pipelineProcOps[routerOpIdx].(map[string]interface{})["routes"].([]interface{})
|
||||
pipelineFilterExpr := routerOproutes[0].(map[string]interface{})["expr"].(string)
|
||||
|
||||
// find logparsingpipeline.Pipeline whose processor is being validated here
|
||||
pipelineIdx := slices.IndexFunc(
|
||||
pipelines, func(p logparsingpipeline.Pipeline) bool {
|
||||
return logparsingpipeline.CollectorConfProcessorName(p) == procName
|
||||
},
|
||||
)
|
||||
require.GreaterOrEqual(tb.t, pipelineIdx, 0)
|
||||
expectedExpr, err := queryBuilderToExpr.Parse(pipelines[pipelineIdx].Filter)
|
||||
require.Nil(tb.t, err)
|
||||
require.Equal(tb.t, expectedExpr, pipelineFilterExpr)
|
||||
}
|
||||
}
|
||||
|
||||
@ -633,8 +691,8 @@ func (conn *mockOpAmpConnection) latestMsgFromServer() *protobufs.ServerToAgent
|
||||
return conn.serverToAgentMsgs[len(conn.serverToAgentMsgs)-1]
|
||||
}
|
||||
|
||||
func (conn *mockOpAmpConnection) LatestPipelinesReceivedFromServer() ([]model.Pipeline, error) {
|
||||
pipelines := []model.Pipeline{}
|
||||
func (conn *mockOpAmpConnection) LatestPipelinesReceivedFromServer() ([]logparsingpipeline.Pipeline, error) {
|
||||
pipelines := []logparsingpipeline.Pipeline{}
|
||||
lastMsg := conn.latestMsgFromServer()
|
||||
if lastMsg == nil {
|
||||
return pipelines, nil
|
||||
|
Loading…
x
Reference in New Issue
Block a user