chore: use last 1day data for apdex latency metric meta (#4846)

This commit is contained in:
Srikanth Chekuri 2024-04-15 13:37:08 +05:30 committed by GitHub
parent 57bfdedfe1
commit a9464de62d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 55 additions and 1634 deletions

View File

@ -152,7 +152,6 @@ func (ah *APIHandler) RegisterRoutes(router *mux.Router, am *baseapp.AuthMiddlew
router.HandleFunc("/api/v1/register", am.OpenAccess(ah.registerUser)).Methods(http.MethodPost) router.HandleFunc("/api/v1/register", am.OpenAccess(ah.registerUser)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/login", am.OpenAccess(ah.loginUser)).Methods(http.MethodPost) router.HandleFunc("/api/v1/login", am.OpenAccess(ah.loginUser)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/traces/{traceId}", am.ViewAccess(ah.searchTraces)).Methods(http.MethodGet) router.HandleFunc("/api/v1/traces/{traceId}", am.ViewAccess(ah.searchTraces)).Methods(http.MethodGet)
router.HandleFunc("/api/v2/metrics/query_range", am.ViewAccess(ah.queryRangeMetricsV2)).Methods(http.MethodPost)
// PAT APIs // PAT APIs
router.HandleFunc("/api/v1/pats", am.AdminAccess(ah.createPAT)).Methods(http.MethodPost) router.HandleFunc("/api/v1/pats", am.AdminAccess(ah.createPAT)).Methods(http.MethodPost)

View File

@ -1,236 +0,0 @@
package api
import (
"bytes"
"fmt"
"net/http"
"sync"
"text/template"
"time"
"go.signoz.io/signoz/pkg/query-service/app/metrics"
"go.signoz.io/signoz/pkg/query-service/app/parser"
"go.signoz.io/signoz/pkg/query-service/constants"
basemodel "go.signoz.io/signoz/pkg/query-service/model"
querytemplate "go.signoz.io/signoz/pkg/query-service/utils/queryTemplate"
"go.uber.org/zap"
)
func (ah *APIHandler) queryRangeMetricsV2(w http.ResponseWriter, r *http.Request) {
if !ah.CheckFeature(basemodel.CustomMetricsFunction) {
zap.L().Info("CustomMetricsFunction feature is not enabled in this plan")
ah.APIHandler.QueryRangeMetricsV2(w, r)
return
}
metricsQueryRangeParams, apiErrorObj := parser.ParseMetricQueryRangeParams(r)
if apiErrorObj != nil {
zap.L().Error("Error in parsing metric query params", zap.Error(apiErrorObj.Err))
RespondError(w, apiErrorObj, nil)
return
}
// prometheus instant query needs same timestamp
if metricsQueryRangeParams.CompositeMetricQuery.PanelType == basemodel.QUERY_VALUE &&
metricsQueryRangeParams.CompositeMetricQuery.QueryType == basemodel.PROM {
metricsQueryRangeParams.Start = metricsQueryRangeParams.End
}
// round up the end to nearest multiple
if metricsQueryRangeParams.CompositeMetricQuery.QueryType == basemodel.QUERY_BUILDER {
end := (metricsQueryRangeParams.End) / 1000
step := metricsQueryRangeParams.Step
metricsQueryRangeParams.End = (end / step * step) * 1000
}
type channelResult struct {
Series []*basemodel.Series
TableName string
Err error
Name string
Query string
}
execClickHouseQueries := func(queries map[string]string) ([]*basemodel.Series, []string, error, map[string]string) {
var seriesList []*basemodel.Series
var tableName []string
ch := make(chan channelResult, len(queries))
var wg sync.WaitGroup
for name, query := range queries {
wg.Add(1)
go func(name, query string) {
defer wg.Done()
seriesList, tableName, err := ah.opts.DataConnector.GetMetricResultEE(r.Context(), query)
for _, series := range seriesList {
series.QueryName = name
}
if err != nil {
ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err), Name: name, Query: query}
return
}
ch <- channelResult{Series: seriesList, TableName: tableName}
}(name, query)
}
wg.Wait()
close(ch)
var errs []error
errQuriesByName := make(map[string]string)
// read values from the channel
for r := range ch {
if r.Err != nil {
errs = append(errs, r.Err)
errQuriesByName[r.Name] = r.Query
continue
}
seriesList = append(seriesList, r.Series...)
tableName = append(tableName, r.TableName)
}
if len(errs) != 0 {
return nil, nil, fmt.Errorf("encountered multiple errors: %s", metrics.FormatErrs(errs, "\n")), errQuriesByName
}
return seriesList, tableName, nil, nil
}
execPromQueries := func(metricsQueryRangeParams *basemodel.QueryRangeParamsV2) ([]*basemodel.Series, error, map[string]string) {
var seriesList []*basemodel.Series
ch := make(chan channelResult, len(metricsQueryRangeParams.CompositeMetricQuery.PromQueries))
var wg sync.WaitGroup
for name, query := range metricsQueryRangeParams.CompositeMetricQuery.PromQueries {
if query.Disabled {
continue
}
wg.Add(1)
go func(name string, query *basemodel.PromQuery) {
var seriesList []*basemodel.Series
defer wg.Done()
tmpl := template.New("promql-query")
tmpl, tmplErr := tmpl.Parse(query.Query)
if tmplErr != nil {
ch <- channelResult{Err: fmt.Errorf("error in parsing query-%s: %v", name, tmplErr), Name: name, Query: query.Query}
return
}
var queryBuf bytes.Buffer
tmplErr = tmpl.Execute(&queryBuf, metricsQueryRangeParams.Variables)
if tmplErr != nil {
ch <- channelResult{Err: fmt.Errorf("error in parsing query-%s: %v", name, tmplErr), Name: name, Query: query.Query}
return
}
query.Query = queryBuf.String()
queryModel := basemodel.QueryRangeParams{
Start: time.UnixMilli(metricsQueryRangeParams.Start),
End: time.UnixMilli(metricsQueryRangeParams.End),
Step: time.Duration(metricsQueryRangeParams.Step * int64(time.Second)),
Query: query.Query,
}
promResult, _, err := ah.opts.DataConnector.GetQueryRangeResult(r.Context(), &queryModel)
if err != nil {
ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err), Name: name, Query: query.Query}
return
}
matrix, _ := promResult.Matrix()
for _, v := range matrix {
var s basemodel.Series
s.QueryName = name
s.Labels = v.Metric.Copy().Map()
for _, p := range v.Floats {
s.Points = append(s.Points, basemodel.MetricPoint{Timestamp: p.T, Value: p.F})
}
seriesList = append(seriesList, &s)
}
ch <- channelResult{Series: seriesList}
}(name, query)
}
wg.Wait()
close(ch)
var errs []error
errQuriesByName := make(map[string]string)
// read values from the channel
for r := range ch {
if r.Err != nil {
errs = append(errs, r.Err)
errQuriesByName[r.Name] = r.Query
continue
}
seriesList = append(seriesList, r.Series...)
}
if len(errs) != 0 {
return nil, fmt.Errorf("encountered multiple errors: %s", metrics.FormatErrs(errs, "\n")), errQuriesByName
}
return seriesList, nil, nil
}
var seriesList []*basemodel.Series
var tableName []string
var err error
var errQuriesByName map[string]string
switch metricsQueryRangeParams.CompositeMetricQuery.QueryType {
case basemodel.QUERY_BUILDER:
runQueries := metrics.PrepareBuilderMetricQueries(metricsQueryRangeParams, constants.SIGNOZ_TIMESERIES_TABLENAME)
if runQueries.Err != nil {
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorBadData, Err: runQueries.Err}, nil)
return
}
seriesList, tableName, err, errQuriesByName = execClickHouseQueries(runQueries.Queries)
case basemodel.CLICKHOUSE:
queries := make(map[string]string)
for name, chQuery := range metricsQueryRangeParams.CompositeMetricQuery.ClickHouseQueries {
if chQuery.Disabled {
continue
}
tmpl := template.New("clickhouse-query")
tmpl, err := tmpl.Parse(chQuery.Query)
if err != nil {
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorBadData, Err: err}, nil)
return
}
var query bytes.Buffer
// replace go template variables
querytemplate.AssignReservedVars(metricsQueryRangeParams)
err = tmpl.Execute(&query, metricsQueryRangeParams.Variables)
if err != nil {
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorBadData, Err: err}, nil)
return
}
queries[name] = query.String()
}
seriesList, tableName, err, errQuriesByName = execClickHouseQueries(queries)
case basemodel.PROM:
seriesList, err, errQuriesByName = execPromQueries(metricsQueryRangeParams)
default:
err = fmt.Errorf("invalid query type")
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorBadData, Err: err}, errQuriesByName)
return
}
if err != nil {
apiErrObj := &basemodel.ApiError{Typ: basemodel.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
return
}
if metricsQueryRangeParams.CompositeMetricQuery.PanelType == basemodel.QUERY_VALUE &&
len(seriesList) > 1 &&
(metricsQueryRangeParams.CompositeMetricQuery.QueryType == basemodel.QUERY_BUILDER ||
metricsQueryRangeParams.CompositeMetricQuery.QueryType == basemodel.CLICKHOUSE) {
RespondError(w, &basemodel.ApiError{Typ: basemodel.ErrorBadData, Err: fmt.Errorf("invalid: query resulted in more than one series for value type")}, nil)
return
}
type ResponseFormat struct {
ResultType string `json:"resultType"`
Result []*basemodel.Series `json:"result"`
TableName []string `json:"tableName"`
}
resp := ResponseFormat{ResultType: "matrix", Result: seriesList, TableName: tableName}
ah.Respond(w, resp)
}

View File

@ -329,7 +329,6 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler) (*http.Server, e
r.Use(loggingMiddleware) r.Use(loggingMiddleware)
apiHandler.RegisterRoutes(r, am) apiHandler.RegisterRoutes(r, am)
apiHandler.RegisterMetricsRoutes(r, am)
apiHandler.RegisterLogsRoutes(r, am) apiHandler.RegisterLogsRoutes(r, am)
apiHandler.RegisterIntegrationRoutes(r, am) apiHandler.RegisterIntegrationRoutes(r, am)
apiHandler.RegisterQueryRangeV3Routes(r, am) apiHandler.RegisterQueryRangeV3Routes(r, am)

View File

@ -1,4 +1,4 @@
import axios from 'api'; import { ApiV4Instance } from 'api';
import { AxiosResponse } from 'axios'; import { AxiosResponse } from 'axios';
import { MetricMetaProps } from 'types/api/metrics/getApDex'; import { MetricMetaProps } from 'types/api/metrics/getApDex';
@ -6,4 +6,6 @@ export const getMetricMeta = (
metricName: string, metricName: string,
servicename: string, servicename: string,
): Promise<AxiosResponse<MetricMetaProps>> => ): Promise<AxiosResponse<MetricMetaProps>> =>
axios.get(`/metric_meta?metricName=${metricName}&serviceName=${servicename}`); ApiV4Instance.get(
`/metric/metric_metadata?metricName=${metricName}&serviceName=${servicename}`,
);

View File

@ -1,27 +0,0 @@
import { ApiV2Instance as axios } from 'api';
import { ErrorResponseHandler } from 'api/ErrorResponseHandler';
import { AxiosError } from 'axios';
import { ErrorResponse, SuccessResponse } from 'types/api';
import {
MetricNameProps,
MetricNamesPayloadProps,
} from 'types/api/metrics/getMetricName';
export const getMetricName = async (
props: MetricNameProps,
): Promise<SuccessResponse<MetricNamesPayloadProps> | ErrorResponse> => {
try {
const response = await axios.get(
`/metrics/autocomplete/list?match=${props || ''}`,
);
return {
statusCode: 200,
error: null,
message: response.data.status,
payload: response.data,
};
} catch (error) {
return ErrorResponseHandler(error as AxiosError);
}
};

View File

@ -1,6 +1,7 @@
import { ApiV2Instance as axios } from 'api'; import { ApiV3Instance as axios } from 'api';
import { ErrorResponseHandler } from 'api/ErrorResponseHandler'; import { ErrorResponseHandler } from 'api/ErrorResponseHandler';
import { AxiosError } from 'axios'; import { AxiosError } from 'axios';
import createQueryParams from 'lib/createQueryParams';
import { ErrorResponse, SuccessResponse } from 'types/api'; import { ErrorResponse, SuccessResponse } from 'types/api';
import { import {
TagKeyProps, TagKeyProps,
@ -8,15 +9,19 @@ import {
TagValueProps, TagValueProps,
TagValuesPayloadProps, TagValuesPayloadProps,
} from 'types/api/metrics/getResourceAttributes'; } from 'types/api/metrics/getResourceAttributes';
import { DataSource, MetricAggregateOperator } from 'types/common/queryBuilder';
export const getResourceAttributesTagKeys = async ( export const getResourceAttributesTagKeys = async (
props: TagKeyProps, props: TagKeyProps,
): Promise<SuccessResponse<TagKeysPayloadProps> | ErrorResponse> => { ): Promise<SuccessResponse<TagKeysPayloadProps> | ErrorResponse> => {
try { try {
const response = await axios.get( const response = await axios.get(
`/metrics/autocomplete/tagKey?metricName=${props.metricName}${ `/autocomplete/attribute_keys?${createQueryParams({
props.match ? `&match=${props.match}` : '' aggregateOperator: MetricAggregateOperator.RATE,
}`, searchText: props.match,
dataSource: DataSource.METRICS,
aggregateAttribute: props.metricName,
})}`,
); );
return { return {
@ -35,7 +40,13 @@ export const getResourceAttributesTagValues = async (
): Promise<SuccessResponse<TagValuesPayloadProps> | ErrorResponse> => { ): Promise<SuccessResponse<TagValuesPayloadProps> | ErrorResponse> => {
try { try {
const response = await axios.get( const response = await axios.get(
`/metrics/autocomplete/tagValue?metricName=${props.metricName}&tagKey=${props.tagKey}`, `/autocomplete/attribute_values?${createQueryParams({
aggregateOperator: MetricAggregateOperator.RATE,
dataSource: DataSource.METRICS,
aggregateAttribute: props.metricName,
attributeKey: props.tagKey,
searchText: '',
})}`,
); );
return { return {

View File

@ -1,3 +1,5 @@
/* eslint-disable sonarjs/no-duplicate-string */
import { DownloadOptions } from 'container/Download/Download.types'; import { DownloadOptions } from 'container/Download/Download.types';
import { MenuItemKeys } from 'container/GridCardLayout/WidgetHeader/contants'; import { MenuItemKeys } from 'container/GridCardLayout/WidgetHeader/contants';
@ -20,7 +22,7 @@ export enum FORMULA {
ERROR_PERCENTAGE = 'A*100/B', ERROR_PERCENTAGE = 'A*100/B',
DATABASE_CALLS_AVG_DURATION = 'A/B', DATABASE_CALLS_AVG_DURATION = 'A/B',
APDEX_TRACES = '((B + C)/2)/A', APDEX_TRACES = '((B + C)/2)/A',
APDEX_DELTA_SPAN_METRICS = '(B + C/2)/A', APDEX_DELTA_SPAN_METRICS = '((B + C)/2)/A',
APDEX_CUMULATIVE_SPAN_METRICS = '((B + C)/2)/A', APDEX_CUMULATIVE_SPAN_METRICS = '((B + C)/2)/A',
} }

View File

@ -33,6 +33,8 @@ export const getNearestHighestBucketValue = (
value: number, value: number,
buckets: number[], buckets: number[],
): string => { ): string => {
// sort the buckets
buckets.sort((a, b) => a - b);
const nearestBucket = buckets.find((bucket) => bucket >= value); const nearestBucket = buckets.find((bucket) => bucket >= value);
return nearestBucket?.toString() || '+Inf'; return nearestBucket?.toString() || '+Inf';
}; };

View File

@ -109,7 +109,11 @@ export const GetTagKeys = async (): Promise<IOption[]> => {
if (!payload || !payload?.data) { if (!payload || !payload?.data) {
return []; return [];
} }
return payload.data
const keys =
payload.data.attributeKeys?.map((attributeKey) => attributeKey.key) || [];
return keys
.filter((tagKey: string) => tagKey !== 'resource_deployment_environment') .filter((tagKey: string) => tagKey !== 'resource_deployment_environment')
.map((tagKey: string) => ({ .map((tagKey: string) => ({
label: convertMetricKeyToTrace(tagKey), label: convertMetricKeyToTrace(tagKey),
@ -125,7 +129,9 @@ export const getEnvironmentTagKeys = async (): Promise<IOption[]> => {
if (!payload || !payload?.data) { if (!payload || !payload?.data) {
return []; return [];
} }
return payload.data.map((tagKey: string) => ({ const keys =
payload.data.attributeKeys?.map((attributeKey) => attributeKey.key) || [];
return keys.map((tagKey: string) => ({
label: convertMetricKeyToTrace(tagKey), label: convertMetricKeyToTrace(tagKey),
value: tagKey, value: tagKey,
})); }));
@ -140,7 +146,10 @@ export const getEnvironmentTagValues = async (): Promise<IOption[]> => {
if (!payload || !payload?.data) { if (!payload || !payload?.data) {
return []; return [];
} }
return payload.data.map((tagValue: string) => ({
const values = payload.data.stringAttributeValues || [];
return values.map((tagValue: string) => ({
label: tagValue, label: tagValue,
value: tagValue, value: tagValue,
})); }));
@ -155,7 +164,10 @@ export const GetTagValues = async (tagKey: string): Promise<IOption[]> => {
if (!payload || !payload?.data) { if (!payload || !payload?.data) {
return []; return [];
} }
return payload.data.map((tagValue: string) => ({
const values = payload.data.stringAttributeValues || [];
return values.map((tagValue: string) => ({
label: tagValue, label: tagValue,
value: tagValue, value: tagValue,
})); }));

View File

@ -26,7 +26,7 @@ export const handlers = [
), ),
rest.get( rest.get(
'http://localhost/api/v2/metrics/autocomplete/tagKey', 'http://localhost/api/v3/autocomplete/attribute_keys',
(req, res, ctx) => { (req, res, ctx) => {
const metricName = req.url.searchParams.get('metricName'); const metricName = req.url.searchParams.get('metricName');
const match = req.url.searchParams.get('match'); const match = req.url.searchParams.get('match');
@ -43,7 +43,7 @@ export const handlers = [
), ),
rest.get( rest.get(
'http://localhost/api/v2/metrics/autocomplete/tagValue', 'http://localhost/api/v3/autocomplete/attribute_values',
(req, res, ctx) => { (req, res, ctx) => {
// ?metricName=signoz_calls_total&tagKey=resource_signoz_collector_id // ?metricName=signoz_calls_total&tagKey=resource_signoz_collector_id
const metricName = req.url.searchParams.get('metricName'); const metricName = req.url.searchParams.get('metricName');

View File

@ -1,9 +1,12 @@
import { IAttributeValuesResponse } from '../queryBuilder/getAttributesValues';
import { IQueryAutocompleteResponse } from '../queryBuilder/queryAutocompleteResponse';
export type TagKeyProps = { export type TagKeyProps = {
match?: string; match?: string;
metricName: string; metricName: string;
}; };
export type TagKeysPayloadProps = { export type TagKeysPayloadProps = {
data: string[]; data: IQueryAutocompleteResponse;
}; };
export type TagValueProps = { export type TagValueProps = {
@ -11,5 +14,5 @@ export type TagValueProps = {
metricName: string; metricName: string;
}; };
export type TagValuesPayloadProps = { export type TagValuesPayloadProps = {
data: string[]; data: IAttributeValuesResponse;
}; };

View File

@ -33,15 +33,3 @@ func (aH *APIHandler) getApdexSettings(w http.ResponseWriter, r *http.Request) {
aH.WriteJSON(w, r, apdexSet) aH.WriteJSON(w, r, apdexSet)
} }
func (aH *APIHandler) getLatencyMetricMetadata(w http.ResponseWriter, r *http.Request) {
metricName := r.URL.Query().Get("metricName")
serviceName := r.URL.Query().Get("serviceName")
metricMetadata, err := aH.reader.GetLatencyMetricMetadata(r.Context(), metricName, serviceName, aH.preferDelta)
if err != nil {
RespondError(w, &model.ApiError{Err: err, Typ: model.ErrorInternal}, nil)
return
}
aH.WriteJSON(w, r, metricMetadata)
}

View File

@ -3071,117 +3071,6 @@ func (r *ClickHouseReader) getPrevErrorID(ctx context.Context, queryParams *mode
} }
} }
func (r *ClickHouseReader) GetMetricAutocompleteTagKey(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError) {
var query string
var err error
var tagKeyList []string
var rows driver.Rows
tagsWhereClause := ""
for key, val := range params.MetricTags {
tagsWhereClause += fmt.Sprintf(" AND JSONExtractString(labels, '%s') = '%s' ", key, val)
}
// "select distinctTagKeys from (SELECT DISTINCT arrayJoin(tagKeys) distinctTagKeys from (SELECT DISTINCT(JSONExtractKeys(labels)) tagKeys from signoz_metrics.time_series WHERE JSONExtractString(labels,'__name__')='node_udp_queues')) WHERE distinctTagKeys ILIKE '%host%';"
if len(params.Match) != 0 {
query = fmt.Sprintf("select distinctTagKeys from (SELECT DISTINCT arrayJoin(tagKeys) distinctTagKeys from (SELECT DISTINCT(JSONExtractKeys(labels)) tagKeys from %s.%s WHERE metric_name=$1 %s)) WHERE distinctTagKeys ILIKE $2;", signozMetricDBName, signozTSTableName, tagsWhereClause)
rows, err = r.db.Query(ctx, query, params.MetricName, fmt.Sprintf("%%%s%%", params.Match))
} else {
query = fmt.Sprintf("select distinctTagKeys from (SELECT DISTINCT arrayJoin(tagKeys) distinctTagKeys from (SELECT DISTINCT(JSONExtractKeys(labels)) tagKeys from %s.%s WHERE metric_name=$1 %s ));", signozMetricDBName, signozTSTableName, tagsWhereClause)
rows, err = r.db.Query(ctx, query, params.MetricName)
}
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
}
defer rows.Close()
var tagKey string
for rows.Next() {
if err := rows.Scan(&tagKey); err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
}
tagKeyList = append(tagKeyList, tagKey)
}
return &tagKeyList, nil
}
func (r *ClickHouseReader) GetMetricAutocompleteTagValue(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError) {
var query string
var err error
var tagValueList []string
var rows driver.Rows
tagsWhereClause := ""
for key, val := range params.MetricTags {
tagsWhereClause += fmt.Sprintf(" AND JSONExtractString(labels, '%s') = '%s' ", key, val)
}
if len(params.Match) != 0 {
query = fmt.Sprintf("SELECT DISTINCT(JSONExtractString(labels, '%s')) from %s.%s WHERE metric_name=$1 %s AND JSONExtractString(labels, '%s') ILIKE $2;", params.TagKey, signozMetricDBName, signozTSTableName, tagsWhereClause, params.TagKey)
rows, err = r.db.Query(ctx, query, params.TagKey, params.MetricName, fmt.Sprintf("%%%s%%", params.Match))
} else {
query = fmt.Sprintf("SELECT DISTINCT(JSONExtractString(labels, '%s')) FROM %s.%s WHERE metric_name=$2 %s;", params.TagKey, signozMetricDBName, signozTSTableName, tagsWhereClause)
rows, err = r.db.Query(ctx, query, params.TagKey, params.MetricName)
}
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
}
defer rows.Close()
var tagValue string
for rows.Next() {
if err := rows.Scan(&tagValue); err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
}
tagValueList = append(tagValueList, tagValue)
}
return &tagValueList, nil
}
func (r *ClickHouseReader) GetMetricAutocompleteMetricNames(ctx context.Context, matchText string, limit int) (*[]string, *model.ApiError) {
var query string
var err error
var metricNameList []string
var rows driver.Rows
query = fmt.Sprintf("SELECT DISTINCT(metric_name) from %s.%s WHERE metric_name ILIKE $1", signozMetricDBName, signozTSTableName)
if limit != 0 {
query = query + fmt.Sprintf(" LIMIT %d;", limit)
}
rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", matchText))
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
}
defer rows.Close()
var metricName string
for rows.Next() {
if err := rows.Scan(&metricName); err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
}
metricNameList = append(metricNameList, metricName)
}
return &metricNameList, nil
}
func (r *ClickHouseReader) GetMetricResultEE(ctx context.Context, query string) ([]*model.Series, string, error) { func (r *ClickHouseReader) GetMetricResultEE(ctx context.Context, query string) ([]*model.Series, string, error) {
zap.L().Error("GetMetricResultEE is not implemented for opensource version") zap.L().Error("GetMetricResultEE is not implemented for opensource version")
return nil, "", fmt.Errorf("GetMetricResultEE is not implemented for opensource version") return nil, "", fmt.Errorf("GetMetricResultEE is not implemented for opensource version")
@ -4165,66 +4054,15 @@ func (r *ClickHouseReader) GetMetricAttributeValues(ctx context.Context, req *v3
return &attributeValues, nil return &attributeValues, nil
} }
func (r *ClickHouseReader) GetLatencyMetricMetadata(ctx context.Context, metricName, serviceName string, preferDelta bool) (*v3.LatencyMetricMetadataResponse, error) {
query := fmt.Sprintf("SELECT DISTINCT(temporality) from %s.%s WHERE metric_name='%s' AND JSONExtractString(labels, 'service_name') = '%s'", signozMetricDBName, signozTSTableName, metricName, serviceName)
rows, err := r.db.Query(ctx, query, metricName)
if err != nil {
zap.L().Error("Error while executing query", zap.Error(err))
return nil, fmt.Errorf("error while executing query: %s", err.Error())
}
defer rows.Close()
var deltaExists bool
for rows.Next() {
var temporality string
if err := rows.Scan(&temporality); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
}
if temporality == string(v3.Delta) {
deltaExists = true
}
}
query = fmt.Sprintf("SELECT DISTINCT(JSONExtractString(labels, 'le')) as le from %s.%s WHERE metric_name='%s' AND JSONExtractString(labels, 'service_name') = '%s' ORDER BY le", signozMetricDBName, signozTSTableName, metricName, serviceName)
rows, err = r.db.Query(ctx, query, metricName)
if err != nil {
zap.L().Error("Error while executing query", zap.Error(err))
return nil, fmt.Errorf("error while executing query: %s", err.Error())
}
defer rows.Close()
var leFloat64 []float64
for rows.Next() {
var leStr string
if err := rows.Scan(&leStr); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
}
le, err := strconv.ParseFloat(leStr, 64)
// ignore the error and continue if the value is not a float
// ideally this should not happen but we have seen ClickHouse
// returning empty string for some values
if err != nil {
zap.L().Error("error while parsing le value", zap.Error(err))
continue
}
if math.IsInf(le, 0) {
continue
}
leFloat64 = append(leFloat64, le)
}
return &v3.LatencyMetricMetadataResponse{
Delta: deltaExists && preferDelta,
Le: leFloat64,
}, nil
}
func (r *ClickHouseReader) GetMetricMetadata(ctx context.Context, metricName, serviceName string) (*v3.MetricMetadataResponse, error) { func (r *ClickHouseReader) GetMetricMetadata(ctx context.Context, metricName, serviceName string) (*v3.MetricMetadataResponse, error) {
unixMilli := common.PastDayRoundOff()
// Note: metric metadata should be accessible regardless of the time range selection // Note: metric metadata should be accessible regardless of the time range selection
// our standard retention period is 30 days, so we are querying the table v4_1_day to reduce the // our standard retention period is 30 days, so we are querying the table v4_1_day to reduce the
// amount of data scanned // amount of data scanned
query := fmt.Sprintf("SELECT DISTINCT temporality, description, type, unit, is_monotonic from %s.%s WHERE metric_name=$1", signozMetricDBName, signozTSTableNameV41Day) query := fmt.Sprintf("SELECT temporality, description, type, unit, is_monotonic from %s.%s WHERE metric_name=$1 AND unix_milli >= $2 GROUP BY temporality, description, type, unit, is_monotonic", signozMetricDBName, signozTSTableNameV41Day)
rows, err := r.db.Query(ctx, query, metricName) rows, err := r.db.Query(ctx, query, metricName, unixMilli)
if err != nil { if err != nil {
zap.L().Error("Error while fetching metric metadata", zap.Error(err)) zap.L().Error("Error while fetching metric metadata", zap.Error(err))
return nil, fmt.Errorf("error while fetching metric metadata: %s", err.Error()) return nil, fmt.Errorf("error while fetching metric metadata: %s", err.Error())
@ -4242,8 +4080,8 @@ func (r *ClickHouseReader) GetMetricMetadata(ctx context.Context, metricName, se
} }
} }
query = fmt.Sprintf("SELECT DISTINCT(JSONExtractString(labels, 'le')) as le from %s.%s WHERE metric_name=$1 AND type = 'Histogram' AND JSONExtractString(labels, 'service_name') = $2 ORDER BY le", signozMetricDBName, signozTSTableNameV41Day) query = fmt.Sprintf("SELECT JSONExtractString(labels, 'le') as le from %s.%s WHERE metric_name=$1 AND unix_milli >= $2 AND type = 'Histogram' AND JSONExtractString(labels, 'service_name') = $3 GROUP BY le ORDER BY le", signozMetricDBName, signozTSTableNameV41Day)
rows, err = r.db.Query(ctx, query, metricName, serviceName) rows, err = r.db.Query(ctx, query, metricName, unixMilli, serviceName)
if err != nil { if err != nil {
zap.L().Error("Error while executing query", zap.Error(err)) zap.L().Error("Error while executing query", zap.Error(err))
return nil, fmt.Errorf("error while executing query: %s", err.Error()) return nil, fmt.Errorf("error while executing query: %s", err.Error())

View File

@ -30,7 +30,6 @@ import (
logsv3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" logsv3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3"
"go.signoz.io/signoz/pkg/query-service/app/metrics" "go.signoz.io/signoz/pkg/query-service/app/metrics"
metricsv3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" metricsv3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3"
"go.signoz.io/signoz/pkg/query-service/app/parser"
"go.signoz.io/signoz/pkg/query-service/app/querier" "go.signoz.io/signoz/pkg/query-service/app/querier"
querierV2 "go.signoz.io/signoz/pkg/query-service/app/querier/v2" querierV2 "go.signoz.io/signoz/pkg/query-service/app/querier/v2"
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
@ -39,7 +38,6 @@ import (
"go.signoz.io/signoz/pkg/query-service/cache" "go.signoz.io/signoz/pkg/query-service/cache"
"go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
querytemplate "go.signoz.io/signoz/pkg/query-service/utils/queryTemplate"
"go.uber.org/multierr" "go.uber.org/multierr"
"go.uber.org/zap" "go.uber.org/zap"
@ -326,14 +324,6 @@ func writeHttpResponse(w http.ResponseWriter, data interface{}) {
} }
} }
func (aH *APIHandler) RegisterMetricsRoutes(router *mux.Router, am *AuthMiddleware) {
subRouter := router.PathPrefix("/api/v2/metrics").Subrouter()
subRouter.HandleFunc("/query_range", am.ViewAccess(aH.QueryRangeMetricsV2)).Methods(http.MethodPost)
subRouter.HandleFunc("/autocomplete/list", am.ViewAccess(aH.metricAutocompleteMetricName)).Methods(http.MethodGet)
subRouter.HandleFunc("/autocomplete/tagKey", am.ViewAccess(aH.metricAutocompleteTagKey)).Methods(http.MethodGet)
subRouter.HandleFunc("/autocomplete/tagValue", am.ViewAccess(aH.metricAutocompleteTagValue)).Methods(http.MethodGet)
}
func (aH *APIHandler) RegisterQueryRangeV3Routes(router *mux.Router, am *AuthMiddleware) { func (aH *APIHandler) RegisterQueryRangeV3Routes(router *mux.Router, am *AuthMiddleware) {
subRouter := router.PathPrefix("/api/v3").Subrouter() subRouter := router.PathPrefix("/api/v3").Subrouter()
subRouter.HandleFunc("/autocomplete/aggregate_attributes", am.ViewAccess( subRouter.HandleFunc("/autocomplete/aggregate_attributes", am.ViewAccess(
@ -419,8 +409,6 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) {
router.HandleFunc("/api/v1/settings/ingestion_key", am.AdminAccess(aH.insertIngestionKey)).Methods(http.MethodPost) router.HandleFunc("/api/v1/settings/ingestion_key", am.AdminAccess(aH.insertIngestionKey)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/settings/ingestion_key", am.ViewAccess(aH.getIngestionKeys)).Methods(http.MethodGet) router.HandleFunc("/api/v1/settings/ingestion_key", am.ViewAccess(aH.getIngestionKeys)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/metric_meta", am.ViewAccess(aH.getLatencyMetricMetadata)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/version", am.OpenAccess(aH.getVersion)).Methods(http.MethodGet) router.HandleFunc("/api/v1/version", am.OpenAccess(aH.getVersion)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/featureFlags", am.OpenAccess(aH.getFeatureFlags)).Methods(http.MethodGet) router.HandleFunc("/api/v1/featureFlags", am.OpenAccess(aH.getFeatureFlags)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/configs", am.OpenAccess(aH.getConfigs)).Methods(http.MethodGet) router.HandleFunc("/api/v1/configs", am.OpenAccess(aH.getConfigs)).Methods(http.MethodGet)
@ -495,62 +483,6 @@ func (aH *APIHandler) getRule(w http.ResponseWriter, r *http.Request) {
aH.Respond(w, ruleResponse) aH.Respond(w, ruleResponse)
} }
func (aH *APIHandler) metricAutocompleteMetricName(w http.ResponseWriter, r *http.Request) {
matchText := r.URL.Query().Get("match")
limit, err := strconv.Atoi(r.URL.Query().Get("limit"))
if err != nil {
limit = 0 // no limit
}
metricNameList, apiErrObj := aH.reader.GetMetricAutocompleteMetricNames(r.Context(), matchText, limit)
if apiErrObj != nil {
RespondError(w, apiErrObj, nil)
return
}
aH.Respond(w, metricNameList)
}
func (aH *APIHandler) metricAutocompleteTagKey(w http.ResponseWriter, r *http.Request) {
metricsAutocompleteTagKeyParams, apiErrorObj := parser.ParseMetricAutocompleteTagParams(r)
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
return
}
tagKeyList, apiErrObj := aH.reader.GetMetricAutocompleteTagKey(r.Context(), metricsAutocompleteTagKeyParams)
if apiErrObj != nil {
RespondError(w, apiErrObj, nil)
return
}
aH.Respond(w, tagKeyList)
}
func (aH *APIHandler) metricAutocompleteTagValue(w http.ResponseWriter, r *http.Request) {
metricsAutocompleteTagValueParams, apiErrorObj := parser.ParseMetricAutocompleteTagParams(r)
if len(metricsAutocompleteTagValueParams.TagKey) == 0 {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("tagKey not present in params")}
RespondError(w, apiErrObj, nil)
return
}
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
return
}
tagValueList, apiErrObj := aH.reader.GetMetricAutocompleteTagValue(r.Context(), metricsAutocompleteTagValueParams)
if apiErrObj != nil {
RespondError(w, apiErrObj, nil)
return
}
aH.Respond(w, tagValueList)
}
func (aH *APIHandler) addTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error { func (aH *APIHandler) addTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error {
metricNames := make([]string, 0) metricNames := make([]string, 0)
@ -593,215 +525,6 @@ func (aH *APIHandler) addTemporality(ctx context.Context, qp *v3.QueryRangeParam
return nil return nil
} }
func (aH *APIHandler) QueryRangeMetricsV2(w http.ResponseWriter, r *http.Request) {
metricsQueryRangeParams, apiErrorObj := parser.ParseMetricQueryRangeParams(r)
if apiErrorObj != nil {
zap.L().Error("error parsing metric query range params", zap.Error(apiErrorObj.Err))
RespondError(w, apiErrorObj, nil)
return
}
// prometheus instant query needs same timestamp
if metricsQueryRangeParams.CompositeMetricQuery.PanelType == model.QUERY_VALUE &&
metricsQueryRangeParams.CompositeMetricQuery.QueryType == model.PROM {
metricsQueryRangeParams.Start = metricsQueryRangeParams.End
}
// round up the end to neaerest multiple
if metricsQueryRangeParams.CompositeMetricQuery.QueryType == model.QUERY_BUILDER {
end := (metricsQueryRangeParams.End) / 1000
step := metricsQueryRangeParams.Step
metricsQueryRangeParams.End = (end / step * step) * 1000
}
type channelResult struct {
Series []*model.Series
Err error
Name string
Query string
}
execClickHouseQueries := func(queries map[string]string) ([]*model.Series, error, map[string]string) {
var seriesList []*model.Series
ch := make(chan channelResult, len(queries))
var wg sync.WaitGroup
for name, query := range queries {
wg.Add(1)
go func(name, query string) {
defer wg.Done()
seriesList, err := aH.reader.GetMetricResult(r.Context(), query)
for _, series := range seriesList {
series.QueryName = name
}
if err != nil {
ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err), Name: name, Query: query}
return
}
ch <- channelResult{Series: seriesList}
}(name, query)
}
wg.Wait()
close(ch)
var errs []error
errQuriesByName := make(map[string]string)
// read values from the channel
for r := range ch {
if r.Err != nil {
errs = append(errs, r.Err)
errQuriesByName[r.Name] = r.Query
continue
}
seriesList = append(seriesList, r.Series...)
}
if len(errs) != 0 {
return nil, fmt.Errorf("encountered multiple errors: %s", metrics.FormatErrs(errs, "\n")), errQuriesByName
}
return seriesList, nil, nil
}
execPromQueries := func(metricsQueryRangeParams *model.QueryRangeParamsV2) ([]*model.Series, error, map[string]string) {
var seriesList []*model.Series
ch := make(chan channelResult, len(metricsQueryRangeParams.CompositeMetricQuery.PromQueries))
var wg sync.WaitGroup
for name, query := range metricsQueryRangeParams.CompositeMetricQuery.PromQueries {
if query.Disabled {
continue
}
wg.Add(1)
go func(name string, query *model.PromQuery) {
var seriesList []*model.Series
defer wg.Done()
tmpl := template.New("promql-query")
tmpl, tmplErr := tmpl.Parse(query.Query)
if tmplErr != nil {
ch <- channelResult{Err: fmt.Errorf("error in parsing query-%s: %v", name, tmplErr), Name: name, Query: query.Query}
return
}
var queryBuf bytes.Buffer
tmplErr = tmpl.Execute(&queryBuf, metricsQueryRangeParams.Variables)
if tmplErr != nil {
ch <- channelResult{Err: fmt.Errorf("error in parsing query-%s: %v", name, tmplErr), Name: name, Query: query.Query}
return
}
query.Query = queryBuf.String()
queryModel := model.QueryRangeParams{
Start: time.UnixMilli(metricsQueryRangeParams.Start),
End: time.UnixMilli(metricsQueryRangeParams.End),
Step: time.Duration(metricsQueryRangeParams.Step * int64(time.Second)),
Query: query.Query,
}
promResult, _, err := aH.reader.GetQueryRangeResult(r.Context(), &queryModel)
if err != nil {
ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err), Name: name, Query: query.Query}
return
}
matrix, _ := promResult.Matrix()
for _, v := range matrix {
var s model.Series
s.QueryName = name
s.Labels = v.Metric.Copy().Map()
for _, p := range v.Floats {
s.Points = append(s.Points, model.MetricPoint{Timestamp: p.T, Value: p.F})
}
seriesList = append(seriesList, &s)
}
ch <- channelResult{Series: seriesList}
}(name, query)
}
wg.Wait()
close(ch)
var errs []error
errQuriesByName := make(map[string]string)
// read values from the channel
for r := range ch {
if r.Err != nil {
errs = append(errs, r.Err)
errQuriesByName[r.Name] = r.Query
continue
}
seriesList = append(seriesList, r.Series...)
}
if len(errs) != 0 {
return nil, fmt.Errorf("encountered multiple errors: %s", metrics.FormatErrs(errs, "\n")), errQuriesByName
}
return seriesList, nil, nil
}
var seriesList []*model.Series
var err error
var errQuriesByName map[string]string
switch metricsQueryRangeParams.CompositeMetricQuery.QueryType {
case model.QUERY_BUILDER:
runQueries := metrics.PrepareBuilderMetricQueries(metricsQueryRangeParams, constants.SIGNOZ_TIMESERIES_TABLENAME)
if runQueries.Err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: runQueries.Err}, nil)
return
}
seriesList, err, errQuriesByName = execClickHouseQueries(runQueries.Queries)
case model.CLICKHOUSE:
queries := make(map[string]string)
for name, chQuery := range metricsQueryRangeParams.CompositeMetricQuery.ClickHouseQueries {
if chQuery.Disabled {
continue
}
tmpl := template.New("clickhouse-query")
tmpl, err := tmpl.Parse(chQuery.Query)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
var query bytes.Buffer
// replace go template variables
querytemplate.AssignReservedVars(metricsQueryRangeParams)
err = tmpl.Execute(&query, metricsQueryRangeParams.Variables)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
queries[name] = query.String()
}
seriesList, err, errQuriesByName = execClickHouseQueries(queries)
case model.PROM:
seriesList, err, errQuriesByName = execPromQueries(metricsQueryRangeParams)
default:
err = fmt.Errorf("invalid query type")
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, errQuriesByName)
return
}
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
return
}
if metricsQueryRangeParams.CompositeMetricQuery.PanelType == model.QUERY_VALUE &&
len(seriesList) > 1 &&
(metricsQueryRangeParams.CompositeMetricQuery.QueryType == model.QUERY_BUILDER ||
metricsQueryRangeParams.CompositeMetricQuery.QueryType == model.CLICKHOUSE) {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("invalid: query resulted in more than one series for value type")}, nil)
return
}
type ResponseFormat struct {
ResultType string `json:"resultType"`
Result []*model.Series `json:"result"`
}
resp := ResponseFormat{ResultType: "matrix", Result: seriesList}
aH.Respond(w, resp)
}
// populateTemporality same as addTemporality but for v4 and better // populateTemporality same as addTemporality but for v4 and better
func (aH *APIHandler) populateTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error { func (aH *APIHandler) populateTemporality(ctx context.Context, qp *v3.QueryRangeParamsV3) error {

View File

@ -5,60 +5,9 @@ import (
"reflect" "reflect"
"strings" "strings"
"github.com/SigNoz/govaluate"
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/model"
"go.uber.org/zap" "go.uber.org/zap"
) )
type RunQueries struct {
Queries map[string]string
Err error
}
var AggregateOperatorToPercentile = map[model.AggregateOperator]float64{
model.P05: 0.5,
model.P10: 0.10,
model.P20: 0.20,
model.P25: 0.25,
model.P50: 0.50,
model.P75: 0.75,
model.P90: 0.90,
model.P95: 0.95,
model.P99: 0.99,
model.HIST_QUANTILE_50: 0.50,
model.HIST_QUANTILE_75: 0.75,
model.HIST_QUANTILE_90: 0.90,
model.HIST_QUANTILE_95: 0.95,
model.HIST_QUANTILE_99: 0.99,
}
var AggregateOperatorToSQLFunc = map[model.AggregateOperator]string{
model.AVG: "avg",
model.MAX: "max",
model.MIN: "min",
model.SUM: "sum",
model.RATE_SUM: "sum",
model.RATE_AVG: "avg",
model.RATE_MAX: "max",
model.RATE_MIN: "min",
}
// See https://github.com/SigNoz/signoz/issues/2151#issuecomment-1467249056
var rateWithoutNegative = `if (runningDifference(value) < 0 OR runningDifference(ts) <= 0, nan, runningDifference(value)/runningDifference(ts))`
var SupportedFunctions = []string{"exp", "log", "ln", "exp2", "log2", "exp10", "log10", "sqrt", "cbrt", "erf", "erfc", "lgamma", "tgamma", "sin", "cos", "tan", "asin", "acos", "atan", "degrees", "radians"}
func GoValuateFuncs() map[string]govaluate.ExpressionFunction {
var GoValuateFuncs = map[string]govaluate.ExpressionFunction{}
for _, fn := range SupportedFunctions {
GoValuateFuncs[fn] = func(args ...interface{}) (interface{}, error) {
return nil, nil
}
}
return GoValuateFuncs
}
// FormattedValue formats the value to be used in clickhouse query // FormattedValue formats the value to be used in clickhouse query
func FormattedValue(v interface{}) string { func FormattedValue(v interface{}) string {
switch x := v.(type) { switch x := v.(type) {
@ -97,398 +46,6 @@ func FormattedValue(v interface{}) string {
} }
} }
// BuildMetricsTimeSeriesFilterQuery builds the sub-query to be used for filtering
// timeseries based on search criteria
func BuildMetricsTimeSeriesFilterQuery(fs *model.FilterSet, groupTags []string, metricName string, aggregateOperator model.AggregateOperator) (string, error) {
var conditions []string
conditions = append(conditions, fmt.Sprintf("metric_name = %s", FormattedValue(metricName)))
if fs != nil && len(fs.Items) != 0 {
for _, item := range fs.Items {
toFormat := item.Value
op := strings.ToLower(strings.TrimSpace(item.Operator))
// if the received value is an array for like/match op, just take the first value
if op == "like" || op == "match" || op == "nlike" || op == "nmatch" {
x, ok := item.Value.([]interface{})
if ok {
if len(x) == 0 {
continue
}
toFormat = x[0]
}
}
fmtVal := FormattedValue(toFormat)
switch op {
case "eq":
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') = %s", item.Key, fmtVal))
case "neq":
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') != %s", item.Key, fmtVal))
case "in":
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') IN %s", item.Key, fmtVal))
case "nin":
conditions = append(conditions, fmt.Sprintf("JSONExtractString(labels, '%s') NOT IN %s", item.Key, fmtVal))
case "like":
conditions = append(conditions, fmt.Sprintf("like(JSONExtractString(labels, '%s'), %s)", item.Key, fmtVal))
case "nlike":
conditions = append(conditions, fmt.Sprintf("notLike(JSONExtractString(labels, '%s'), %s)", item.Key, fmtVal))
case "match":
conditions = append(conditions, fmt.Sprintf("match(JSONExtractString(labels, '%s'), %s)", item.Key, fmtVal))
case "nmatch":
conditions = append(conditions, fmt.Sprintf("not match(JSONExtractString(labels, '%s'), %s)", item.Key, fmtVal))
default:
return "", fmt.Errorf("unsupported operation")
}
}
}
queryString := strings.Join(conditions, " AND ")
var selectLabels string
if aggregateOperator == model.NOOP || aggregateOperator == model.RATE {
selectLabels = "labels,"
} else {
for _, tag := range groupTags {
selectLabels += fmt.Sprintf(" JSONExtractString(labels, '%s') as %s,", tag, tag)
}
}
filterSubQuery := fmt.Sprintf("SELECT %s fingerprint FROM %s.%s WHERE %s", selectLabels, constants.SIGNOZ_METRIC_DBNAME, constants.SIGNOZ_TIMESERIES_LOCAL_TABLENAME, queryString)
return filterSubQuery, nil
}
func BuildMetricQuery(qp *model.QueryRangeParamsV2, mq *model.MetricQuery, tableName string) (string, error) {
if qp.CompositeMetricQuery.PanelType == model.QUERY_VALUE && len(mq.GroupingTags) != 0 {
return "", fmt.Errorf("reduce operator cannot be applied for the query")
}
filterSubQuery, err := BuildMetricsTimeSeriesFilterQuery(mq.TagFilters, mq.GroupingTags, mq.MetricName, mq.AggregateOperator)
if err != nil {
return "", err
}
samplesTableTimeFilter := fmt.Sprintf("metric_name = %s AND timestamp_ms >= %d AND timestamp_ms <= %d", FormattedValue(mq.MetricName), qp.Start, qp.End)
// Select the aggregate value for interval
queryTmpl :=
"SELECT %s" +
" toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," +
" %s as value" +
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME +
" INNER JOIN" +
" (%s) as filtered_time_series" +
" USING fingerprint" +
" WHERE " + samplesTableTimeFilter +
" GROUP BY %s" +
" ORDER BY %s ts"
tagsWithoutLe := []string{}
for _, tag := range mq.GroupingTags {
if tag != "le" {
tagsWithoutLe = append(tagsWithoutLe, tag)
}
}
groupByWithoutLe := groupBy(tagsWithoutLe...)
groupTagsWithoutLe := groupSelect(tagsWithoutLe...)
groupBy := groupBy(mq.GroupingTags...)
groupTags := groupSelect(mq.GroupingTags...)
switch mq.AggregateOperator {
case model.RATE:
// Calculate rate of change of metric for each unique time series
groupBy = "fingerprint, ts"
groupTags = "fingerprint,"
op := "max(value)" // max value should be the closest value for point in time
subQuery := fmt.Sprintf(
queryTmpl, "any(labels) as labels, "+groupTags, qp.Step, op, filterSubQuery, groupBy, groupTags,
) // labels will be same so any should be fine
query := `SELECT %s ts, ` + rateWithoutNegative + ` as value FROM(%s) WHERE isNaN(value) = 0`
query = fmt.Sprintf(query, "labels as fullLabels,", subQuery)
return query, nil
case model.SUM_RATE:
rateGroupBy := "fingerprint, " + groupBy
rateGroupTags := "fingerprint, " + groupTags
op := "max(value)"
subQuery := fmt.Sprintf(
queryTmpl, rateGroupTags, qp.Step, op, filterSubQuery, rateGroupBy, rateGroupTags,
) // labels will be same so any should be fine
query := `SELECT %s ts, ` + rateWithoutNegative + `as value FROM(%s) WHERE isNaN(value) = 0`
query = fmt.Sprintf(query, groupTags, subQuery)
query = fmt.Sprintf(`SELECT %s ts, sum(value) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTags, query, groupBy, groupTags)
return query, nil
case model.RATE_SUM, model.RATE_MAX, model.RATE_AVG, model.RATE_MIN:
op := fmt.Sprintf("%s(value)", AggregateOperatorToSQLFunc[mq.AggregateOperator])
subQuery := fmt.Sprintf(queryTmpl, groupTags, qp.Step, op, filterSubQuery, groupBy, groupTags)
query := `SELECT %s ts, ` + rateWithoutNegative + `as value FROM(%s) WHERE isNaN(value) = 0`
query = fmt.Sprintf(query, groupTags, subQuery)
return query, nil
case model.P05, model.P10, model.P20, model.P25, model.P50, model.P75, model.P90, model.P95, model.P99:
op := fmt.Sprintf("quantile(%v)(value)", AggregateOperatorToPercentile[mq.AggregateOperator])
query := fmt.Sprintf(queryTmpl, groupTags, qp.Step, op, filterSubQuery, groupBy, groupTags)
return query, nil
case model.HIST_QUANTILE_50, model.HIST_QUANTILE_75, model.HIST_QUANTILE_90, model.HIST_QUANTILE_95, model.HIST_QUANTILE_99:
rateGroupBy := "fingerprint, " + groupBy
rateGroupTags := "fingerprint, " + groupTags
op := "max(value)"
subQuery := fmt.Sprintf(
queryTmpl, rateGroupTags, qp.Step, op, filterSubQuery, rateGroupBy, rateGroupTags,
) // labels will be same so any should be fine
query := `SELECT %s ts, ` + rateWithoutNegative + ` as value FROM(%s) WHERE isNaN(value) = 0`
query = fmt.Sprintf(query, groupTags, subQuery)
// filter out NaN values from the rate query as histogramQuantile doesn't support NaN values
query = fmt.Sprintf(`SELECT %s ts, sum(value) as value FROM (%s) GROUP BY %s HAVING isNaN(value) = 0 ORDER BY %s ts`, groupTags, query, groupBy, groupTags)
value := AggregateOperatorToPercentile[mq.AggregateOperator]
query = fmt.Sprintf(`SELECT %s ts, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), %.3f) as value FROM (%s) GROUP BY %s ORDER BY %s ts`, groupTagsWithoutLe, value, query, groupByWithoutLe, groupTagsWithoutLe)
return query, nil
case model.AVG, model.SUM, model.MIN, model.MAX:
op := fmt.Sprintf("%s(value)", AggregateOperatorToSQLFunc[mq.AggregateOperator])
query := fmt.Sprintf(queryTmpl, groupTags, qp.Step, op, filterSubQuery, groupBy, groupTags)
return query, nil
case model.COUNT:
op := "toFloat64(count(*))"
query := fmt.Sprintf(queryTmpl, groupTags, qp.Step, op, filterSubQuery, groupBy, groupTags)
return query, nil
case model.COUNT_DISTINCT:
op := "toFloat64(count(distinct(value)))"
query := fmt.Sprintf(queryTmpl, groupTags, qp.Step, op, filterSubQuery, groupBy, groupTags)
return query, nil
case model.NOOP:
queryTmpl :=
"SELECT fingerprint, labels as fullLabels," +
" toStartOfInterval(toDateTime(intDiv(timestamp_ms, 1000)), INTERVAL %d SECOND) as ts," +
" any(value) as value" +
" FROM " + constants.SIGNOZ_METRIC_DBNAME + "." + constants.SIGNOZ_SAMPLES_TABLENAME +
" INNER JOIN" +
" (%s) as filtered_time_series" +
" USING fingerprint" +
" WHERE " + samplesTableTimeFilter +
" GROUP BY fingerprint, labels, ts" +
" ORDER BY fingerprint, labels, ts"
query := fmt.Sprintf(queryTmpl, qp.Step, filterSubQuery)
return query, nil
default:
return "", fmt.Errorf("unsupported aggregate operator")
}
}
func groupBy(tags ...string) string {
tags = append(tags, "ts")
return strings.Join(tags, ",")
}
func groupSelect(tags ...string) string {
groupTags := strings.Join(tags, ",")
if len(tags) != 0 {
groupTags += ", "
}
return groupTags
}
// validateExpressions validates the math expressions using the list of
// allowed functions.
func validateExpressions(expressions []string, funcs map[string]govaluate.ExpressionFunction) []error {
var errs []error
for _, exp := range expressions {
_, err := govaluate.NewEvaluableExpressionWithFunctions(exp, funcs)
if err != nil {
errs = append(errs, err)
}
}
return errs
}
// FormatErrs returns formatted error string
func FormatErrs(errs []error, separator string) string {
var errStrs []string
for _, err := range errs {
errStrs = append(errStrs, err.Error())
}
return strings.Join(errStrs, separator)
}
func reduceQuery(query string, reduceTo model.ReduceToOperator, aggregateOperator model.AggregateOperator) (string, error) {
var selectLabels string
var groupBy string
// NOOP and RATE can possibly return multiple time series and reduce should be applied
// for each uniques series. When the final result contains more than one series we throw
// an error post DB fetching. Otherwise just return the single data. This is not known until queried so the
// the query is prepared accordingly.
if aggregateOperator == model.NOOP || aggregateOperator == model.RATE {
selectLabels = ", any(fullLabels) as fullLabels"
groupBy = "GROUP BY fingerprint"
}
// the timestamp picked is not relevant here since the final value used is show the single
// chart with just the query value. For the quer
switch reduceTo {
case model.RLAST:
query = fmt.Sprintf("SELECT anyLast(value) as value, any(ts) as ts %s FROM (%s) %s", selectLabels, query, groupBy)
case model.RSUM:
query = fmt.Sprintf("SELECT sum(value) as value, any(ts) as ts %s FROM (%s) %s", selectLabels, query, groupBy)
case model.RAVG:
query = fmt.Sprintf("SELECT avg(value) as value, any(ts) as ts %s FROM (%s) %s", selectLabels, query, groupBy)
case model.RMAX:
query = fmt.Sprintf("SELECT max(value) as value, any(ts) as ts %s FROM (%s) %s", selectLabels, query, groupBy)
case model.RMIN:
query = fmt.Sprintf("SELECT min(value) as value, any(ts) as ts %s FROM (%s) %s", selectLabels, query, groupBy)
default:
return "", fmt.Errorf("unsupported reduce operator")
}
return query, nil
}
// varToQuery constructs the query for each named builder block
func varToQuery(qp *model.QueryRangeParamsV2, tableName string) (map[string]string, error) {
evalFuncs := GoValuateFuncs()
varToQuery := make(map[string]string)
for _, builderQuery := range qp.CompositeMetricQuery.BuilderQueries {
// err should be nil here since the expression is already validated
expression, _ := govaluate.NewEvaluableExpressionWithFunctions(builderQuery.Expression, evalFuncs)
// Use the parsed expression and build the query for each variable
// if not already exists
var errs []error
for _, _var := range expression.Vars() {
if _, ok := varToQuery[_var]; !ok {
mq, varExists := qp.CompositeMetricQuery.BuilderQueries[_var]
if !varExists {
errs = append(errs, fmt.Errorf("variable %s not found in builder queries", _var))
continue
}
query, err := BuildMetricQuery(qp, mq, tableName)
if err != nil {
errs = append(errs, err)
} else {
if qp.CompositeMetricQuery.PanelType == model.QUERY_VALUE {
query, err = reduceQuery(query, mq.ReduceTo, mq.AggregateOperator)
if err != nil {
errs = append(errs, err)
}
}
}
varToQuery[_var] = query
}
}
if len(errs) != 0 {
return nil, fmt.Errorf("error while creating query: %s", FormatErrs(errs, "\n"))
}
}
return varToQuery, nil
}
func unique(slice []string) []string {
keys := make(map[string]struct{})
list := []string{}
for _, entry := range slice {
if _, value := keys[entry]; !value {
keys[entry] = struct{}{}
list = append(list, entry)
}
}
return list
}
// expressionToQuery constructs the query for the expression
func expressionToQuery(qp *model.QueryRangeParamsV2, varToQuery map[string]string, expression *govaluate.EvaluableExpression) (string, error) {
var formulaQuery string
vars := unique(expression.Vars())
for idx, var_ := range vars[1:] {
x, y := vars[idx], var_
if !reflect.DeepEqual(qp.CompositeMetricQuery.BuilderQueries[x].GroupingTags, qp.CompositeMetricQuery.BuilderQueries[y].GroupingTags) {
return "", fmt.Errorf("group by must be same")
}
}
var modified []govaluate.ExpressionToken
tokens := expression.Tokens()
for idx := range tokens {
token := tokens[idx]
if token.Kind == govaluate.VARIABLE {
token.Value = fmt.Sprintf("%v.value", token.Value)
token.Meta = fmt.Sprintf("%v.value", token.Meta)
}
modified = append(modified, token)
}
// err should be nil here since the expression is already validated
formula, _ := govaluate.NewEvaluableExpressionFromTokens(modified)
var formulaSubQuery string
var joinUsing string
var prevVar string
for idx, var_ := range vars {
query := varToQuery[var_]
groupTags := qp.CompositeMetricQuery.BuilderQueries[var_].GroupingTags
groupTags = append(groupTags, "ts")
if joinUsing == "" {
for _, tag := range groupTags {
joinUsing += fmt.Sprintf("%s.%s as %s, ", var_, tag, tag)
}
joinUsing = strings.TrimSuffix(joinUsing, ", ")
}
formulaSubQuery += fmt.Sprintf("(%s) as %s ", query, var_)
if idx > 0 {
formulaSubQuery += " ON "
for _, tag := range groupTags {
formulaSubQuery += fmt.Sprintf("%s.%s = %s.%s AND ", prevVar, tag, var_, tag)
}
formulaSubQuery = strings.TrimSuffix(formulaSubQuery, " AND ")
}
if idx < len(vars)-1 {
formulaSubQuery += " INNER JOIN"
}
prevVar = var_
}
formulaQuery = fmt.Sprintf("SELECT %s, %s as value FROM ", joinUsing, formula.ExpressionString()) + formulaSubQuery
return formulaQuery, nil
}
// PrepareBuilderMetricQueries constructs the queries to be run for query range timeseries
func PrepareBuilderMetricQueries(qp *model.QueryRangeParamsV2, tableName string) *RunQueries {
evalFuncs := GoValuateFuncs()
// validate the expressions
var expressions []string
for _, bq := range qp.CompositeMetricQuery.BuilderQueries {
expressions = append(expressions, bq.Expression)
}
if errs := validateExpressions(expressions, evalFuncs); len(errs) != 0 {
return &RunQueries{Err: fmt.Errorf("invalid expressions: %s", FormatErrs(errs, "\n"))}
}
varToQuery, err := varToQuery(qp, tableName)
if err != nil {
return &RunQueries{Err: err}
}
namedQueries := make(map[string]string)
var errs []error
for _, builderQuery := range qp.CompositeMetricQuery.BuilderQueries {
if builderQuery.Disabled {
continue
}
expression, _ := govaluate.NewEvaluableExpressionWithFunctions(builderQuery.Expression, evalFuncs)
tokens := expression.Tokens()
// expression with one token is used to represent
// that there are no functions applied on query
if len(tokens) == 1 {
_var := tokens[0].Value.(string)
namedQueries[builderQuery.QueryName] = varToQuery[_var]
} else {
query, err := expressionToQuery(qp, varToQuery, expression)
if err != nil {
errs = append(errs, err)
}
namedQueries[builderQuery.QueryName] = query
}
}
if len(errs) != 0 {
return &RunQueries{Err: fmt.Errorf("errors with formulas: %s", FormatErrs(errs, "\n"))}
}
return &RunQueries{Queries: namedQueries}
}
// PromFormattedValue formats the value to be used in promql // PromFormattedValue formats the value to be used in promql
func PromFormattedValue(v interface{}) string { func PromFormattedValue(v interface{}) string {
switch x := v.(type) { switch x := v.(type) {

View File

@ -1,261 +0,0 @@
package metrics
import (
"strings"
"testing"
. "github.com/smartystreets/goconvey/convey"
"go.signoz.io/signoz/pkg/query-service/model"
)
func TestBuildQuery(t *testing.T) {
Convey("TestSimpleQueryWithName", t, func() {
q := &model.QueryRangeParamsV2{
Start: 1650991982000,
End: 1651078382000,
Step: 60,
CompositeMetricQuery: &model.CompositeMetricQuery{
BuilderQueries: map[string]*model.MetricQuery{
"A": {
QueryName: "A",
MetricName: "name",
AggregateOperator: model.RATE_MAX,
Expression: "A",
},
},
},
}
queries := PrepareBuilderMetricQueries(q, "table").Queries
So(len(queries), ShouldEqual, 1)
So(queries["A"], ShouldContainSubstring, "WHERE metric_name = 'name'")
So(queries["A"], ShouldContainSubstring, rateWithoutNegative)
})
Convey("TestSimpleQueryWithHistQuantile", t, func() {
q := &model.QueryRangeParamsV2{
Start: 1650991982000,
End: 1651078382000,
Step: 60,
CompositeMetricQuery: &model.CompositeMetricQuery{
BuilderQueries: map[string]*model.MetricQuery{
"A": {
QueryName: "A",
MetricName: "name",
AggregateOperator: model.HIST_QUANTILE_99,
Expression: "A",
},
},
},
}
queries := PrepareBuilderMetricQueries(q, "table").Queries
So(len(queries), ShouldEqual, 1)
So(queries["A"], ShouldContainSubstring, "WHERE metric_name = 'name'")
So(queries["A"], ShouldContainSubstring, rateWithoutNegative)
So(queries["A"], ShouldContainSubstring, "HAVING isNaN(value) = 0")
})
}
func TestBuildQueryWithFilters(t *testing.T) {
Convey("TestBuildQueryWithFilters", t, func() {
q := &model.QueryRangeParamsV2{
Start: 1650991982000,
End: 1651078382000,
Step: 60,
CompositeMetricQuery: &model.CompositeMetricQuery{
BuilderQueries: map[string]*model.MetricQuery{
"A": {
QueryName: "A",
MetricName: "name",
TagFilters: &model.FilterSet{Operator: "AND", Items: []model.FilterItem{
{Key: "a", Value: "b", Operator: "neq"},
{Key: "code", Value: "ERROR_*", Operator: "nmatch"},
}},
AggregateOperator: model.RATE_MAX,
Expression: "A",
},
},
},
}
queries := PrepareBuilderMetricQueries(q, "table").Queries
So(len(queries), ShouldEqual, 1)
So(queries["A"], ShouldContainSubstring, "WHERE metric_name = 'name' AND JSONExtractString(labels, 'a') != 'b'")
So(queries["A"], ShouldContainSubstring, rateWithoutNegative)
So(queries["A"], ShouldContainSubstring, "not match(JSONExtractString(labels, 'code'), 'ERROR_*')")
})
}
func TestBuildQueryWithMultipleQueries(t *testing.T) {
Convey("TestBuildQueryWithFilters", t, func() {
q := &model.QueryRangeParamsV2{
Start: 1650991982000,
End: 1651078382000,
Step: 60,
CompositeMetricQuery: &model.CompositeMetricQuery{
BuilderQueries: map[string]*model.MetricQuery{
"A": {
QueryName: "A",
MetricName: "name",
TagFilters: &model.FilterSet{Operator: "AND", Items: []model.FilterItem{
{Key: "in", Value: []interface{}{"a", "b", "c"}, Operator: "in"},
}},
AggregateOperator: model.RATE_AVG,
Expression: "A",
},
"B": {
QueryName: "B",
MetricName: "name2",
AggregateOperator: model.RATE_MAX,
Expression: "B",
},
},
},
}
queries := PrepareBuilderMetricQueries(q, "table").Queries
So(len(queries), ShouldEqual, 2)
So(queries["A"], ShouldContainSubstring, "WHERE metric_name = 'name' AND JSONExtractString(labels, 'in') IN ['a','b','c']")
So(queries["A"], ShouldContainSubstring, rateWithoutNegative)
})
}
func TestBuildQueryWithMultipleQueriesAndFormula(t *testing.T) {
Convey("TestBuildQueryWithFilters", t, func() {
q := &model.QueryRangeParamsV2{
Start: 1650991982000,
End: 1651078382000,
Step: 60,
CompositeMetricQuery: &model.CompositeMetricQuery{
BuilderQueries: map[string]*model.MetricQuery{
"A": {
QueryName: "A",
MetricName: "name",
TagFilters: &model.FilterSet{Operator: "AND", Items: []model.FilterItem{
{Key: "in", Value: []interface{}{"a", "b", "c"}, Operator: "in"},
}},
AggregateOperator: model.RATE_MAX,
Expression: "A",
},
"B": {
MetricName: "name2",
AggregateOperator: model.RATE_AVG,
Expression: "B",
},
"C": {
QueryName: "C",
Expression: "A/B",
},
},
},
}
queries := PrepareBuilderMetricQueries(q, "table").Queries
So(len(queries), ShouldEqual, 3)
So(queries["C"], ShouldContainSubstring, "SELECT A.ts as ts, A.value / B.value")
So(queries["C"], ShouldContainSubstring, "WHERE metric_name = 'name' AND JSONExtractString(labels, 'in') IN ['a','b','c']")
So(queries["C"], ShouldContainSubstring, rateWithoutNegative)
})
}
func TestBuildQueryWithIncorrectQueryRef(t *testing.T) {
Convey("TestBuildQueryWithFilters", t, func() {
q := &model.QueryRangeParamsV2{
Start: 1650991982000,
End: 1651078382000,
Step: 60,
CompositeMetricQuery: &model.CompositeMetricQuery{
BuilderQueries: map[string]*model.MetricQuery{
"A": {
QueryName: "A",
MetricName: "name",
TagFilters: &model.FilterSet{Operator: "AND", Items: []model.FilterItem{
{Key: "in", Value: []interface{}{"a", "b", "c"}, Operator: "in"},
}},
AggregateOperator: model.RATE_MAX,
Expression: "A",
},
"C": {
QueryName: "C",
Expression: "D*2",
},
},
},
}
res := PrepareBuilderMetricQueries(q, "table")
So(res.Err, ShouldNotBeNil)
So(res.Err.Error(), ShouldContainSubstring, "variable D not found in builder queries")
})
}
func TestBuildQueryWithThreeOrMoreQueriesRefAndFormula(t *testing.T) {
Convey("TestBuildQueryWithFilters", t, func() {
q := &model.QueryRangeParamsV2{
Start: 1650991982000,
End: 1651078382000,
Step: 60,
CompositeMetricQuery: &model.CompositeMetricQuery{
BuilderQueries: map[string]*model.MetricQuery{
"A": {
QueryName: "A",
MetricName: "name",
TagFilters: &model.FilterSet{Operator: "AND", Items: []model.FilterItem{
{Key: "in", Value: []interface{}{"a", "b", "c"}, Operator: "in"},
}},
AggregateOperator: model.RATE_MAX,
Expression: "A",
Disabled: true,
},
"B": {
MetricName: "name2",
AggregateOperator: model.RATE_AVG,
Expression: "B",
Disabled: true,
},
"C": {
MetricName: "name3",
AggregateOperator: model.SUM_RATE,
Expression: "C",
Disabled: true,
},
"F1": {
QueryName: "F1",
Expression: "A/B",
},
"F2": {
QueryName: "F2",
Expression: "A/(B+C)",
},
"F3": {
QueryName: "F3",
Expression: "A*A",
},
"F4": {
QueryName: "F4",
Expression: "A*B*C",
},
"F5": {
QueryName: "F5",
Expression: "((A - B) / B) * 100",
},
},
},
}
res := PrepareBuilderMetricQueries(q, "table")
So(res.Err, ShouldBeNil)
queries := res.Queries
So(len(queries), ShouldEqual, 5)
So(queries["F1"], ShouldContainSubstring, "SELECT A.ts as ts, A.value / B.value")
So(strings.Count(queries["F1"], " ON "), ShouldEqual, 1)
So(queries["F2"], ShouldContainSubstring, "SELECT A.ts as ts, A.value / (B.value + C.value)")
So(strings.Count(queries["F2"], " ON "), ShouldEqual, 2)
// Working with same query multiple times should not join on itself
So(queries["F3"], ShouldNotContainSubstring, " ON ")
So(queries["F4"], ShouldContainSubstring, "SELECT A.ts as ts, A.value * B.value * C.value")
// Number of times JOIN ON appears is N-1 where N is number of unique queries
So(strings.Count(queries["F4"], " ON "), ShouldEqual, 2)
So(queries["F5"], ShouldContainSubstring, "SELECT A.ts as ts, ((A.value - B.value) / B.value) * 100")
So(strings.Count(queries["F5"], " ON "), ShouldEqual, 1)
})
}

View File

@ -1,117 +0,0 @@
package parser
import (
"encoding/json"
"fmt"
"net/http"
"strings"
"go.signoz.io/signoz/pkg/query-service/app/metrics"
"go.signoz.io/signoz/pkg/query-service/model"
)
func validateQueryRangeParamsV2(qp *model.QueryRangeParamsV2) error {
var errs []error
if !(qp.DataSource >= model.METRICS && qp.DataSource <= model.LOGS) {
errs = append(errs, fmt.Errorf("unsupported data source"))
}
if !(qp.CompositeMetricQuery.QueryType >= model.QUERY_BUILDER && qp.CompositeMetricQuery.QueryType <= model.PROM) {
errs = append(errs, fmt.Errorf("unsupported query type"))
}
if !(qp.CompositeMetricQuery.PanelType >= model.TIME_SERIES && qp.CompositeMetricQuery.PanelType <= model.QUERY_VALUE) {
errs = append(errs, fmt.Errorf("unsupported panel type"))
}
if len(errs) != 0 {
return fmt.Errorf("one or more errors found : %s", metrics.FormatErrs(errs, ","))
}
return nil
}
func ParseMetricQueryRangeParams(r *http.Request) (*model.QueryRangeParamsV2, *model.ApiError) {
var postData *model.QueryRangeParamsV2
if err := json.NewDecoder(r.Body).Decode(&postData); err != nil {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
}
if err := validateQueryRangeParamsV2(postData); err != nil {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
}
// prepare the variables for the corrspnding query type
formattedVars := make(map[string]interface{})
for name, value := range postData.Variables {
if postData.CompositeMetricQuery.QueryType == model.PROM {
formattedVars[name] = metrics.PromFormattedValue(value)
} else if postData.CompositeMetricQuery.QueryType == model.CLICKHOUSE {
formattedVars[name] = metrics.FormattedValue(value)
}
}
// replace the variables in metrics builder filter item with actual value
if postData.CompositeMetricQuery.QueryType == model.QUERY_BUILDER {
for _, query := range postData.CompositeMetricQuery.BuilderQueries {
if query.TagFilters == nil || len(query.TagFilters.Items) == 0 {
continue
}
for idx := range query.TagFilters.Items {
item := &query.TagFilters.Items[idx]
value := item.Value
if value != nil {
switch x := value.(type) {
case string:
variableName := strings.Trim(x, "{{ . }}")
if _, ok := postData.Variables[variableName]; ok {
item.Value = postData.Variables[variableName]
}
case []interface{}:
if len(x) > 0 {
switch x[0].(type) {
case string:
variableName := strings.Trim(x[0].(string), "{{ . }}")
if _, ok := postData.Variables[variableName]; ok {
item.Value = postData.Variables[variableName]
}
}
}
}
}
}
}
}
postData.Variables = formattedVars
return postData, nil
}
func ParseMetricAutocompleteTagParams(r *http.Request) (*model.MetricAutocompleteTagParams, *model.ApiError) {
metricName := r.URL.Query().Get("metricName")
if len(metricName) == 0 {
err := fmt.Errorf("metricName not present in params")
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: err}
}
tagsStr := r.URL.Query().Get("tags")
// parsing tags
var tags map[string]string
if tagsStr != "" && len(tagsStr) != 0 {
err := json.Unmarshal([]byte(tagsStr), &tags)
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("unable to parse tags in params: %v", err)}
}
}
matchText := r.URL.Query().Get("match")
tagKey := r.URL.Query().Get("tagKey")
metricAutocompleteTagParams := &model.MetricAutocompleteTagParams{
MetricName: metricName,
MetricTags: tags,
Match: matchText,
TagKey: tagKey,
}
return metricAutocompleteTagParams, nil
}

View File

@ -9,62 +9,11 @@ import (
"testing" "testing"
"time" "time"
"github.com/smartystreets/assertions/should"
. "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/query-service/app/metrics"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
) )
func TestParseFilterSingleFilter(t *testing.T) {
Convey("TestParseFilterSingleFilter", t, func() {
postBody := []byte(`{
"op": "AND",
"items": [
{"key": "namespace", "value": "a", "op": "EQ"}
]
}`)
req, _ := http.NewRequest("POST", "", bytes.NewReader(postBody))
res, _ := parseFilterSet(req)
query, _ := metrics.BuildMetricsTimeSeriesFilterQuery(res, []string{}, "table", model.NOOP)
So(query, ShouldContainSubstring, "WHERE metric_name = 'table' AND JSONExtractString(labels, 'namespace') = 'a'")
})
}
func TestParseFilterMultipleFilter(t *testing.T) {
Convey("TestParseFilterMultipleFilter", t, func() {
postBody := []byte(`{
"op": "AND",
"items": [
{"key": "namespace", "value": "a", "op": "EQ"},
{"key": "host", "value": ["host-1", "host-2"], "op": "IN"}
]
}`)
req, _ := http.NewRequest("POST", "", bytes.NewReader(postBody))
res, _ := parseFilterSet(req)
query, _ := metrics.BuildMetricsTimeSeriesFilterQuery(res, []string{}, "table", model.NOOP)
So(query, should.ContainSubstring, "JSONExtractString(labels, 'host') IN ['host-1','host-2']")
So(query, should.ContainSubstring, "JSONExtractString(labels, 'namespace') = 'a'")
})
}
func TestParseFilterNotSupportedOp(t *testing.T) {
Convey("TestParseFilterNotSupportedOp", t, func() {
postBody := []byte(`{
"op": "AND",
"items": [
{"key": "namespace", "value": "a", "op": "PO"}
]
}`)
req, _ := http.NewRequest("POST", "", bytes.NewReader(postBody))
res, _ := parseFilterSet(req)
_, err := metrics.BuildMetricsTimeSeriesFilterQuery(res, []string{}, "table", model.NOOP)
So(err, should.BeError, "unsupported operation")
})
}
func TestParseAggregateAttrReques(t *testing.T) { func TestParseAggregateAttrReques(t *testing.T) {
reqCases := []struct { reqCases := []struct {
desc string desc string

View File

@ -276,7 +276,6 @@ func (s *Server) createPublicServer(api *APIHandler) (*http.Server, error) {
am := NewAuthMiddleware(auth.GetUserFromRequest) am := NewAuthMiddleware(auth.GetUserFromRequest)
api.RegisterRoutes(r, am) api.RegisterRoutes(r, am)
api.RegisterMetricsRoutes(r, am)
api.RegisterLogsRoutes(r, am) api.RegisterLogsRoutes(r, am)
api.RegisterIntegrationRoutes(r, am) api.RegisterIntegrationRoutes(r, am)
api.RegisterQueryRangeV3Routes(r, am) api.RegisterQueryRangeV3Routes(r, am)

View File

@ -58,9 +58,6 @@ type Reader interface {
SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) SetTTL(ctx context.Context, ttlParams *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError)
FetchTemporality(ctx context.Context, metricNames []string) (map[string]map[v3.Temporality]bool, error) FetchTemporality(ctx context.Context, metricNames []string) (map[string]map[v3.Temporality]bool, error)
GetMetricAutocompleteMetricNames(ctx context.Context, matchText string, limit int) (*[]string, *model.ApiError)
GetMetricAutocompleteTagKey(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError)
GetMetricAutocompleteTagValue(ctx context.Context, params *model.MetricAutocompleteTagParams) (*[]string, *model.ApiError)
GetMetricResult(ctx context.Context, query string) ([]*model.Series, error) GetMetricResult(ctx context.Context, query string) ([]*model.Series, error)
GetMetricResultEE(ctx context.Context, query string) ([]*model.Series, string, error) GetMetricResultEE(ctx context.Context, query string) ([]*model.Series, string, error)
GetMetricAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) GetMetricAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error)
@ -106,7 +103,6 @@ type Reader interface {
QueryDashboardVars(ctx context.Context, query string) (*model.DashboardVar, error) QueryDashboardVars(ctx context.Context, query string) (*model.DashboardVar, error)
CheckClickHouse(ctx context.Context) error CheckClickHouse(ctx context.Context) error
GetLatencyMetricMetadata(context.Context, string, string, bool) (*v3.LatencyMetricMetadataResponse, error)
GetMetricMetadata(context.Context, string, string) (*v3.MetricMetadataResponse, error) GetMetricMetadata(context.Context, string, string) (*v3.MetricMetadataResponse, error)
} }

View File

@ -3,27 +3,9 @@ package querytemplate
import ( import (
"fmt" "fmt"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
) )
// AssignReservedVars assigns values for go template vars. assumes that
// model.QueryRangeParamsV2.Start and End are Unix Nano timestamps
func AssignReservedVars(metricsQueryRangeParams *model.QueryRangeParamsV2) {
metricsQueryRangeParams.Variables["start_timestamp"] = metricsQueryRangeParams.Start / 1000
metricsQueryRangeParams.Variables["end_timestamp"] = metricsQueryRangeParams.End / 1000
metricsQueryRangeParams.Variables["start_timestamp_ms"] = metricsQueryRangeParams.Start
metricsQueryRangeParams.Variables["end_timestamp_ms"] = metricsQueryRangeParams.End
metricsQueryRangeParams.Variables["start_timestamp_nano"] = metricsQueryRangeParams.Start * 1e6
metricsQueryRangeParams.Variables["end_timestamp_nano"] = metricsQueryRangeParams.End * 1e6
metricsQueryRangeParams.Variables["start_datetime"] = fmt.Sprintf("toDateTime(%d)", metricsQueryRangeParams.Start/1000)
metricsQueryRangeParams.Variables["end_datetime"] = fmt.Sprintf("toDateTime(%d)", metricsQueryRangeParams.End/1000)
}
// AssignReservedVars assigns values for go template vars. assumes that // AssignReservedVars assigns values for go template vars. assumes that
// model.QueryRangeParamsV3.Start and End are Unix Nano timestamps // model.QueryRangeParamsV3.Start and End are Unix Nano timestamps
func AssignReservedVarsV3(metricsQueryRangeParams *v3.QueryRangeParamsV3) { func AssignReservedVarsV3(metricsQueryRangeParams *v3.QueryRangeParamsV3) {