mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-12 02:29:03 +08:00
feat(summary): added update metrics metadata api (#7235)
* feat(explorer): updated metadata metrics api| 7076 * feat(explorer): added inspect metrics with resource attribute| 7076 * fix(summary): fixed dashboard name in metric metadata api * fix(summary): removed offset from second query * fix(summary): removed offset from second query * feat(summary): added update metrics metadata api * feat(summary): resolved log messages * feat(summary): added is_monotonic column and added temporality| 7077 * feat(summary): added histogram bucket and summary quantile check| 7077 * feat(summary): added temporality and is_monotonic in update queries| 7077 * feat(summary): resolved pr comments| 7077 * feat(inspect): normalized resource attributes * feat(update-summary): merge conflicts resolve * feat(update-summary): merge conflicts resolve * feat(update-metrics): updated description check * feat(update-metrics): added kv log comments * fix: updated testcase with reader * fix: updated testcase with reader * fix: updated testcase with reader * fix: updated normalized true in metrics explorer api * fix: removed inner join from list metrics query
This commit is contained in:
parent
8a479d42ff
commit
5b6b5bf359
@ -304,6 +304,11 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
&opAmpModel.AllAgents, agentConfMgr,
|
||||
)
|
||||
|
||||
errorList := qb.PreloadMetricsMetadata(context.Background())
|
||||
for _, er := range errorList {
|
||||
zap.L().Error("failed to preload metrics metadata", zap.Error(er))
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
|
6
go.mod
6
go.mod
@ -6,7 +6,7 @@ toolchain go1.22.7
|
||||
|
||||
require (
|
||||
dario.cat/mergo v1.0.1
|
||||
github.com/ClickHouse/clickhouse-go/v2 v2.25.0
|
||||
github.com/ClickHouse/clickhouse-go/v2 v2.30.0
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.2
|
||||
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd
|
||||
github.com/SigNoz/signoz-otel-collector v0.111.16
|
||||
@ -53,7 +53,7 @@ require (
|
||||
github.com/sethvargo/go-password v0.2.0
|
||||
github.com/smartystreets/goconvey v1.8.1
|
||||
github.com/soheilhy/cmux v0.1.5
|
||||
github.com/srikanthccv/ClickHouse-go-mock v0.9.0
|
||||
github.com/srikanthccv/ClickHouse-go-mock v0.11.0
|
||||
github.com/stretchr/testify v1.10.0
|
||||
github.com/tidwall/gjson v1.18.0
|
||||
github.com/uptrace/bun v1.2.9
|
||||
@ -92,7 +92,7 @@ require (
|
||||
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect
|
||||
github.com/ClickHouse/ch-go v0.61.5 // indirect
|
||||
github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b // indirect
|
||||
github.com/andybalholm/brotli v1.1.0 // indirect
|
||||
github.com/andybalholm/brotli v1.1.1 // indirect
|
||||
github.com/armon/go-metrics v0.4.1 // indirect
|
||||
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
|
||||
github.com/aws/aws-sdk-go v1.55.5 // indirect
|
||||
|
18
go.sum
18
go.sum
@ -81,8 +81,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
|
||||
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
||||
github.com/ClickHouse/ch-go v0.61.5 h1:zwR8QbYI0tsMiEcze/uIMK+Tz1D3XZXLdNrlaOpeEI4=
|
||||
github.com/ClickHouse/ch-go v0.61.5/go.mod h1:s1LJW/F/LcFs5HJnuogFMta50kKDO0lf9zzfrbl0RQg=
|
||||
github.com/ClickHouse/clickhouse-go/v2 v2.25.0 h1:rKscwqgQHzWBTZySZDcHKxgs0Ad+xFULfZvo26W5UlY=
|
||||
github.com/ClickHouse/clickhouse-go/v2 v2.25.0/go.mod h1:iDTViXk2Fgvf1jn2dbJd1ys+fBkdD1UMRnXlwmhijhQ=
|
||||
github.com/ClickHouse/clickhouse-go/v2 v2.30.0 h1:AG4D/hW39qa58+JHQIFOSnxyL46H6h2lrmGGk17dhFo=
|
||||
github.com/ClickHouse/clickhouse-go/v2 v2.30.0/go.mod h1:i9ZQAojcayW3RsdCb3YR+n+wC2h65eJsZCscZ1Z1wyo=
|
||||
github.com/Code-Hex/go-generics-cache v1.5.1 h1:6vhZGc5M7Y/YD8cIUcY8kcuQLB4cHR7U+0KMqAA0KcU=
|
||||
github.com/Code-Hex/go-generics-cache v1.5.1/go.mod h1:qxcC9kRVrct9rHeiYpFWSoW1vxyillCVzX13KZG8dl4=
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
|
||||
@ -110,8 +110,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
|
||||
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
|
||||
github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b h1:mimo19zliBX/vSQ6PWWSL9lK8qwHozUj03+zLoEB8O0=
|
||||
github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b/go.mod h1:fvzegU4vN3H1qMT+8wDmzjAcDONcgo2/SZ/TyfdUOFs=
|
||||
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
|
||||
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
|
||||
github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
|
||||
github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
|
||||
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
|
||||
github.com/antonmedv/expr v1.15.3 h1:q3hOJZNvLvhqE8OHBs1cFRdbXFNKuA+bHmRaI+AmRmI=
|
||||
github.com/antonmedv/expr v1.15.3/go.mod h1:0E/6TxnOlRNp81GMzX9QfDPAmHo2Phg00y4JUv1ihsE=
|
||||
@ -195,8 +195,8 @@ github.com/digitalocean/godo v1.122.0 h1:ziytLQi8QKtDp2K1A+YrYl2dWLHLh2uaMzWvcz9
|
||||
github.com/digitalocean/godo v1.122.0/go.mod h1:WQVH83OHUy6gC4gXpEVQKtxTd4L5oCp+5OialidkPLY=
|
||||
github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk=
|
||||
github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
|
||||
github.com/docker/docker v27.2.0+incompatible h1:Rk9nIVdfH3+Vz4cyI/uhbINhEZ/oLmc+CBXmH6fbNk4=
|
||||
github.com/docker/docker v27.2.0+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
|
||||
github.com/docker/docker v27.3.0+incompatible h1:BNb1QY6o4JdKpqwi9IB+HUYcRRrVN4aGFUTvDmWYK1A=
|
||||
github.com/docker/docker v27.3.0+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
|
||||
github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c=
|
||||
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
|
||||
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
|
||||
@ -867,8 +867,8 @@ github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0
|
||||
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
|
||||
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
|
||||
github.com/spf13/viper v1.13.0/go.mod h1:Icm2xNL3/8uyh/wFuB1jI7TiTNKp8632Nwegu+zgdYw=
|
||||
github.com/srikanthccv/ClickHouse-go-mock v0.9.0 h1:XKr1Tb7GL1HlifKH874QGR3R6l0e6takXasROUiZawU=
|
||||
github.com/srikanthccv/ClickHouse-go-mock v0.9.0/go.mod h1:pgJm+apjvi7FHxEdgw1Bt4MRbUYpVxyhKQ/59Wkig24=
|
||||
github.com/srikanthccv/ClickHouse-go-mock v0.11.0 h1:hKY9l7SbhI4IPPs7hjKAL1iDgKc7rpfu8kx7BvehqlQ=
|
||||
github.com/srikanthccv/ClickHouse-go-mock v0.11.0/go.mod h1:CzFC21J4tLn7cEYdU5k6hg7yyf052xtZXUY2e3UF6+I=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
@ -930,6 +930,8 @@ github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23n
|
||||
github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
|
||||
github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c h1:3lbZUMbMiGUW/LMkfsEABsc5zNT9+b1CvsJx47JzJ8g=
|
||||
github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c/go.mod h1:UrdRz5enIKZ63MEE3IF9l2/ebyx59GyGgPi+tICQdmM=
|
||||
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
|
||||
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
|
||||
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
|
||||
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
|
@ -128,6 +128,8 @@ func (middleware *Logging) getLogCommentKVs(r *http.Request) map[string]string {
|
||||
if tab == "" {
|
||||
tab = "OVER_METRICS"
|
||||
}
|
||||
} else if strings.Contains(path, "/metrics") {
|
||||
page = "metrics-explorer"
|
||||
}
|
||||
} else {
|
||||
client = "api"
|
||||
|
@ -96,12 +96,15 @@ const (
|
||||
signozTSLocalTableNameV41Week = "time_series_v4_1week"
|
||||
signozTSTableNameV41Week = "distributed_time_series_v4_1week"
|
||||
|
||||
signozTableAttributesMetadata = "distributed_attributes_metadata"
|
||||
signozLocalTableAttributesMetadata = "attributes_metadata"
|
||||
minTimespanForProgressiveSearch = time.Hour
|
||||
minTimespanForProgressiveSearchMargin = time.Minute
|
||||
maxProgressiveSteps = 4
|
||||
charset = "abcdefghijklmnopqrstuvwxyz" +
|
||||
signozTableAttributesMetadata = "distributed_attributes_metadata"
|
||||
signozLocalTableAttributesMetadata = "attributes_metadata"
|
||||
|
||||
signozUpdatedMetricsMetadataLocalTable = "updated_metadata"
|
||||
signozUpdatedMetricsMetadataTable = "distributed_updated_metadata"
|
||||
minTimespanForProgressiveSearch = time.Hour
|
||||
minTimespanForProgressiveSearchMargin = time.Minute
|
||||
maxProgressiveSteps = 4
|
||||
charset = "abcdefghijklmnopqrstuvwxyz" +
|
||||
"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
|
||||
NANOSECOND = 1000000000
|
||||
)
|
||||
@ -1140,7 +1143,7 @@ func (r *ClickHouseReader) GetUsage(ctx context.Context, queryParams *model.GetU
|
||||
|
||||
func (r *ClickHouseReader) SearchTracesV2(ctx context.Context, params *model.SearchTracesParams,
|
||||
smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string,
|
||||
levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) {
|
||||
levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) {
|
||||
searchSpansResult := []model.SearchSpansResult{
|
||||
{
|
||||
Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References", "Events", "HasError", "StatusMessage", "StatusCodeString", "SpanKind"},
|
||||
@ -1288,7 +1291,7 @@ func (r *ClickHouseReader) SearchTracesV2(ctx context.Context, params *model.Sea
|
||||
|
||||
func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.SearchTracesParams,
|
||||
smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string,
|
||||
levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) {
|
||||
levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) {
|
||||
|
||||
if r.useTraceNewSchema {
|
||||
return r.SearchTracesV2(ctx, params, smartTraceAlgorithm)
|
||||
@ -2883,8 +2886,22 @@ func (r *ClickHouseReader) GetTotalLogs(ctx context.Context) (uint64, error) {
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) FetchTemporality(ctx context.Context, metricNames []string) (map[string]map[v3.Temporality]bool, error) {
|
||||
|
||||
metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
|
||||
var metricNamesToQuery []string
|
||||
for _, metricName := range metricNames {
|
||||
updatedMetadata, cacheErr := r.GetUpdatedMetricsMetadata(ctx, metricName)
|
||||
if cacheErr != nil {
|
||||
zap.L().Info("Error in getting metrics cached metadata", zap.Error(cacheErr))
|
||||
}
|
||||
if updatedMetadata != nil {
|
||||
if _, exists := metricNameToTemporality[metricName]; !exists {
|
||||
metricNameToTemporality[metricName] = make(map[v3.Temporality]bool)
|
||||
}
|
||||
metricNameToTemporality[metricName][updatedMetadata.Temporality] = true
|
||||
} else {
|
||||
metricNamesToQuery = append(metricNamesToQuery, metricName)
|
||||
}
|
||||
}
|
||||
|
||||
query := fmt.Sprintf(`SELECT DISTINCT metric_name, temporality FROM %s.%s WHERE metric_name IN $1`, signozMetricDBName, signozTSTableNameV41Day)
|
||||
|
||||
@ -3714,6 +3731,13 @@ func (r *ClickHouseReader) GetMetricAggregateAttributes(
|
||||
continue
|
||||
}
|
||||
|
||||
metadata, apiError := r.GetUpdatedMetricsMetadata(ctx, metricName)
|
||||
if apiError == nil && metadata != nil {
|
||||
typ = string(metadata.MetricType)
|
||||
isMonotonic = metadata.IsMonotonic
|
||||
temporality = string(metadata.Temporality)
|
||||
}
|
||||
|
||||
// Non-monotonic cumulative sums are treated as gauges
|
||||
if typ == "Sum" && !isMonotonic && temporality == string(v3.Cumulative) {
|
||||
typ = "Gauge"
|
||||
@ -3834,6 +3858,18 @@ func (r *ClickHouseReader) GetMetricMetadata(ctx context.Context, metricName, se
|
||||
deltaExists = true
|
||||
}
|
||||
}
|
||||
metadata, apiError := r.GetUpdatedMetricsMetadata(ctx, metricName)
|
||||
if apiError == nil && metadata != nil {
|
||||
metricType = string(metadata.MetricType)
|
||||
temporality = string(metadata.Temporality)
|
||||
if temporality == string(v3.Delta) {
|
||||
deltaExists = true
|
||||
}
|
||||
isMonotonic = metadata.IsMonotonic
|
||||
if metadata.Description != "" {
|
||||
description = metadata.Description
|
||||
}
|
||||
}
|
||||
|
||||
query = fmt.Sprintf("SELECT JSONExtractString(labels, 'le') as le from %s.%s WHERE metric_name=$1 AND unix_milli >= $2 AND type = 'Histogram' AND JSONExtractString(labels, 'service_name') = $3 GROUP BY le ORDER BY le", signozMetricDBName, signozTSTableNameV41Day)
|
||||
rows, err = r.db.Query(ctx, query, metricName, unixMilli, serviceName)
|
||||
@ -5919,6 +5955,7 @@ func (r *ClickHouseReader) GetActiveTimeSeriesForMetricName(ctx context.Context,
|
||||
func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, req *metrics_explorer.SummaryListMetricsRequest) (*metrics_explorer.SummaryListMetricsResponse, *model.ApiError) {
|
||||
var args []interface{}
|
||||
|
||||
// Build filter conditions (if any)
|
||||
conditions, _ := utils.BuildFilterConditions(&req.Filters, "t")
|
||||
whereClause := ""
|
||||
if conditions != nil {
|
||||
@ -5940,6 +5977,7 @@ func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, req *metrics_
|
||||
orderByClauseFirstQuery = fmt.Sprintf("ORDER BY %s %s", req.OrderBy.ColumnName, req.OrderBy.Order)
|
||||
}
|
||||
|
||||
// Determine which tables to use
|
||||
start, end, tsTable, localTsTable := utils.WhichTSTableToUse(req.Start, req.EndD)
|
||||
sampleTable, countExp := utils.WhichSampleTableToUse(req.Start, req.EndD)
|
||||
|
||||
@ -5953,6 +5991,8 @@ func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, req *metrics_
|
||||
uniq(metric_name) OVER() AS total
|
||||
FROM %s.%s AS t
|
||||
WHERE unix_milli BETWEEN ? AND ?
|
||||
AND NOT startsWith(metric_name, 'signoz_')
|
||||
AND __normalized = true
|
||||
%s
|
||||
GROUP BY t.metric_name
|
||||
%s
|
||||
@ -5985,60 +6025,86 @@ func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, req *metrics_
|
||||
return &response, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
|
||||
// If no metrics were found, return early.
|
||||
if len(metricNames) == 0 {
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
// Build a comma-separated list of quoted metric names.
|
||||
metricsList := "'" + strings.Join(metricNames, "', '") + "'"
|
||||
// If samples are being sorted by datapoints, update the ORDER clause.
|
||||
if dataPointsOrder {
|
||||
orderByClauseFirstQuery = fmt.Sprintf("ORDER BY s.samples %s", req.OrderBy.Order)
|
||||
} else {
|
||||
orderByClauseFirstQuery = ""
|
||||
}
|
||||
|
||||
var sampleQuery string
|
||||
var sb strings.Builder
|
||||
|
||||
sb.WriteString(fmt.Sprintf(
|
||||
`SELECT
|
||||
if whereClause != "" {
|
||||
sb.WriteString(fmt.Sprintf(
|
||||
`SELECT
|
||||
s.samples,
|
||||
s.metric_name,
|
||||
s.lastReceived
|
||||
FROM (
|
||||
SELECT
|
||||
dm.metric_name,
|
||||
%s AS samples,
|
||||
MAX(dm.unix_milli) AS lastReceived
|
||||
FROM %s.%s AS dm
|
||||
WHERE dm.metric_name IN (%s)
|
||||
AND dm.fingerprint IN (
|
||||
SELECT fingerprint
|
||||
FROM %s.%s
|
||||
WHERE metric_name IN (%s)
|
||||
AND __normalized = true
|
||||
%s
|
||||
GROUP BY fingerprint
|
||||
)
|
||||
AND dm.unix_milli BETWEEN ? AND ?
|
||||
GROUP BY dm.metric_name
|
||||
) AS s `,
|
||||
countExp,
|
||||
signozMetricDBName, sampleTable,
|
||||
metricsList,
|
||||
signozMetricDBName, localTsTable,
|
||||
metricsList,
|
||||
whereClause,
|
||||
))
|
||||
} else {
|
||||
// If no filters, it is a simpler query.
|
||||
sb.WriteString(fmt.Sprintf(
|
||||
`SELECT
|
||||
s.samples,
|
||||
s.metric_name,
|
||||
s.unix_milli AS lastReceived
|
||||
s.lastReceived
|
||||
FROM (
|
||||
SELECT
|
||||
metric_name,
|
||||
%s AS samples,
|
||||
max(unix_milli) as unix_milli
|
||||
MAX(unix_milli) AS lastReceived
|
||||
FROM %s.%s
|
||||
`, countExp, signozMetricDBName, sampleTable))
|
||||
|
||||
// Conditionally add the fingerprint subquery if `whereClause` is present
|
||||
if whereClause != "" {
|
||||
sb.WriteString(fmt.Sprintf(
|
||||
`WHERE fingerprint IN (
|
||||
SELECT fingerprint
|
||||
FROM %s.%s
|
||||
WHERE unix_milli BETWEEN ? AND ?
|
||||
%s
|
||||
AND metric_name IN (%s)
|
||||
GROUP BY fingerprint
|
||||
)
|
||||
AND metric_name IN (%s) `,
|
||||
signozMetricDBName, localTsTable, whereClause, metricsList, metricsList))
|
||||
} else {
|
||||
sb.WriteString(fmt.Sprintf(
|
||||
`WHERE metric_name IN (%s) `, metricsList))
|
||||
WHERE metric_name IN (%s)
|
||||
AND unix_milli BETWEEN ? AND ?
|
||||
GROUP BY metric_name
|
||||
) AS s `,
|
||||
countExp,
|
||||
signozMetricDBName, sampleTable,
|
||||
metricsList))
|
||||
}
|
||||
|
||||
sb.WriteString(`GROUP BY metric_name ) AS s `)
|
||||
|
||||
// Append ORDER BY clause if provided.
|
||||
if orderByClauseFirstQuery != "" {
|
||||
sb.WriteString(orderByClauseFirstQuery)
|
||||
sb.WriteString(orderByClauseFirstQuery + " ")
|
||||
}
|
||||
|
||||
sb.WriteString(fmt.Sprintf(" LIMIT %d;", req.Limit))
|
||||
|
||||
sampleQuery := sb.String()
|
||||
// Append LIMIT clause.
|
||||
sb.WriteString(fmt.Sprintf("LIMIT %d;", req.Limit))
|
||||
sampleQuery = sb.String()
|
||||
|
||||
// Append the time boundaries for sampleQuery.
|
||||
args = append(args, start, end)
|
||||
rows, err = r.db.Query(valueCtx, sampleQuery, args...)
|
||||
if err != nil {
|
||||
@ -6078,6 +6144,7 @@ func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, req *metrics_
|
||||
}
|
||||
response.Metrics = filteredMetrics
|
||||
|
||||
// If ordering by samples, sort in-memory.
|
||||
if dataPointsOrder {
|
||||
sort.Slice(response.Metrics, func(i, j int) bool {
|
||||
return response.Metrics[i].Samples > response.Metrics[j].Samples
|
||||
@ -6110,9 +6177,9 @@ func (r *ClickHouseReader) GetMetricsTimeSeriesPercentage(ctx context.Context, r
|
||||
uniq(fingerprint) AS total_value,
|
||||
(SELECT uniq(fingerprint)
|
||||
FROM %s.%s
|
||||
WHERE unix_milli BETWEEN ? AND ?) AS total_time_series
|
||||
WHERE unix_milli BETWEEN ? AND ? AND __normalized = true) AS total_time_series
|
||||
FROM %s.%s
|
||||
WHERE unix_milli BETWEEN ? AND ? %s
|
||||
WHERE unix_milli BETWEEN ? AND ? AND NOT startsWith(metric_name, 'signoz_') AND __normalized = true %s
|
||||
GROUP BY metric_name
|
||||
)
|
||||
ORDER BY percentage DESC
|
||||
@ -6159,8 +6226,7 @@ func (r *ClickHouseReader) GetMetricsTimeSeriesPercentage(ctx context.Context, r
|
||||
func (r *ClickHouseReader) GetMetricsSamplesPercentage(ctx context.Context, req *metrics_explorer.TreeMapMetricsRequest) (*[]metrics_explorer.TreeMapResponseItem, *model.ApiError) {
|
||||
var args []interface{}
|
||||
|
||||
// Build the filter conditions
|
||||
conditions, _ := utils.BuildFilterConditions(&req.Filters, "t")
|
||||
conditions, _ := utils.BuildFilterConditions(&req.Filters, "ts")
|
||||
whereClause := ""
|
||||
if conditions != nil {
|
||||
whereClause = "AND " + strings.Join(conditions, " AND ")
|
||||
@ -6170,26 +6236,23 @@ func (r *ClickHouseReader) GetMetricsSamplesPercentage(ctx context.Context, req
|
||||
start, end, tsTable, localTsTable := utils.WhichTSTableToUse(req.Start, req.EndD)
|
||||
sampleTable, countExp := utils.WhichSampleTableToUse(req.Start, req.EndD)
|
||||
|
||||
// Construct the metrics query
|
||||
queryLimit := 50 + req.Limit
|
||||
metricsQuery := fmt.Sprintf(
|
||||
`SELECT
|
||||
t.metric_name AS metric_name,
|
||||
uniq(t.fingerprint) AS timeSeries
|
||||
FROM %s.%s AS t
|
||||
WHERE unix_milli BETWEEN ? AND ?
|
||||
ts.metric_name AS metric_name,
|
||||
uniq(ts.fingerprint) AS timeSeries
|
||||
FROM %s.%s AS ts
|
||||
WHERE NOT startsWith(ts.metric_name, 'signoz_')
|
||||
AND __normalized = true
|
||||
%s
|
||||
GROUP BY t.metric_name
|
||||
GROUP BY ts.metric_name
|
||||
ORDER BY timeSeries DESC
|
||||
LIMIT %d;`,
|
||||
signozMetricDBName, tsTable, whereClause, queryLimit,
|
||||
)
|
||||
|
||||
args = append(args, start, end)
|
||||
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
|
||||
|
||||
// Execute the metrics query
|
||||
rows, err := r.db.Query(valueCtx, metricsQuery, args...)
|
||||
rows, err := r.db.Query(valueCtx, metricsQuery)
|
||||
if err != nil {
|
||||
zap.L().Error("Error executing metrics query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
@ -6220,7 +6283,7 @@ func (r *ClickHouseReader) GetMetricsSamplesPercentage(ctx context.Context, req
|
||||
// Format metric names for query
|
||||
metricsList := "'" + strings.Join(metricNames, "', '") + "'"
|
||||
|
||||
// Build query using string builder for better performance
|
||||
// Build optimized query with JOIN but `unix_milli` filter only on the sample table
|
||||
var sb strings.Builder
|
||||
|
||||
sb.WriteString(fmt.Sprintf(
|
||||
@ -6236,46 +6299,46 @@ func (r *ClickHouseReader) GetMetricsSamplesPercentage(ctx context.Context, req
|
||||
FROM
|
||||
(
|
||||
SELECT
|
||||
metric_name,
|
||||
dm.metric_name,
|
||||
%s AS samples
|
||||
FROM %s.%s`,
|
||||
FROM %s.%s AS dm`,
|
||||
countExp, signozMetricDBName, sampleTable, // Total samples
|
||||
countExp, signozMetricDBName, sampleTable, // Inner select samples
|
||||
))
|
||||
|
||||
// Conditionally add the fingerprint subquery if whereClause is present
|
||||
// Apply `unix_milli` filter **only** on the sample table (`dm`)
|
||||
sb.WriteString(` WHERE dm.unix_milli BETWEEN ? AND ?`)
|
||||
|
||||
// Use JOIN instead of IN (subquery) when additional filters exist
|
||||
if whereClause != "" {
|
||||
sb.WriteString(fmt.Sprintf(
|
||||
` WHERE fingerprint IN (
|
||||
SELECT fingerprint
|
||||
FROM %s.%s
|
||||
WHERE unix_milli BETWEEN ? AND ?
|
||||
` AND dm.fingerprint IN (
|
||||
SELECT ts.fingerprint
|
||||
FROM %s.%s AS ts
|
||||
WHERE ts.metric_name IN (%s)
|
||||
AND __normalized = true
|
||||
%s
|
||||
AND metric_name IN (%s)
|
||||
GROUP BY fingerprint
|
||||
)
|
||||
AND metric_name IN (%s)`,
|
||||
signozMetricDBName, localTsTable, whereClause, metricsList,
|
||||
metricsList,
|
||||
))
|
||||
} else {
|
||||
sb.WriteString(fmt.Sprintf(
|
||||
` WHERE metric_name IN (%s)`,
|
||||
metricsList,
|
||||
GROUP BY ts.fingerprint
|
||||
)`,
|
||||
signozMetricDBName, localTsTable, metricsList, whereClause,
|
||||
))
|
||||
}
|
||||
|
||||
sb.WriteString(`
|
||||
GROUP BY metric_name
|
||||
// Apply metric filtering after all conditions
|
||||
sb.WriteString(fmt.Sprintf(
|
||||
` AND dm.metric_name IN (%s)
|
||||
GROUP BY dm.metric_name
|
||||
) AS s
|
||||
JOIN TotalSamples t ON 1 = 1
|
||||
ORDER BY percentage DESC
|
||||
LIMIT ?;`)
|
||||
LIMIT ?;`,
|
||||
metricsList,
|
||||
))
|
||||
|
||||
sampleQuery := sb.String()
|
||||
|
||||
// Add start and end time to args
|
||||
args = append(args, start, end)
|
||||
// Add start and end time to args (only for sample table)
|
||||
args = append(args, start, end, start, end, req.Limit)
|
||||
|
||||
// Execute the sample percentage query
|
||||
rows, err = r.db.Query(valueCtx, sampleQuery, args...)
|
||||
@ -6316,6 +6379,8 @@ func (r *ClickHouseReader) GetNameSimilarity(ctx context.Context, req *metrics_e
|
||||
FROM %s.%s
|
||||
WHERE metric_name != ?
|
||||
AND unix_milli BETWEEN ? AND ?
|
||||
AND NOT startsWith(metric_name, 'signoz_')
|
||||
AND __normalized = true
|
||||
GROUP BY metric_name
|
||||
ORDER BY name_similarity DESC
|
||||
LIMIT 30;`,
|
||||
@ -6362,6 +6427,8 @@ func (r *ClickHouseReader) GetAttributeSimilarity(ctx context.Context, req *metr
|
||||
WHERE metric_name = ?
|
||||
AND unix_milli between ? and ?
|
||||
AND NOT startsWith(kv.1, '__')
|
||||
AND NOT startsWith(metric_name, 'signoz_')
|
||||
AND __normalized = true
|
||||
GROUP BY label_key
|
||||
LIMIT 50`, signozMetricDBName, tsTable)
|
||||
|
||||
@ -6438,6 +6505,8 @@ func (r *ClickHouseReader) GetAttributeSimilarity(ctx context.Context, req *metr
|
||||
FROM %s.%s
|
||||
WHERE rand() %% 100 < 10
|
||||
AND unix_milli between ? and ?
|
||||
AND NOT startsWith(metric_name, 'signoz_')
|
||||
AND __normalized = true
|
||||
GROUP BY metric_name
|
||||
ORDER BY weighted_match_count DESC, raw_match_count DESC
|
||||
LIMIT 30
|
||||
@ -6693,3 +6762,164 @@ LIMIT 40`, // added rand to get diff value every time we run this query
|
||||
|
||||
return fingerprints, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) DeleteMetricsMetadata(ctx context.Context, metricName string) *model.ApiError {
|
||||
delQuery := fmt.Sprintf(`ALTER TABLE %s.%s DELETE WHERE metric_name = ?;`, signozMetricDBName, signozUpdatedMetricsMetadataLocalTable)
|
||||
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
|
||||
err := r.db.Exec(valueCtx, delQuery, metricName)
|
||||
if err != nil {
|
||||
return &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
r.cache.Remove(ctx, constants.UpdatedMetricsMetadataCachePrefix+metricName)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) UpdateMetricsMetadata(ctx context.Context, req *model.UpdateMetricsMetadata) *model.ApiError {
|
||||
if req.MetricType == v3.MetricTypeHistogram {
|
||||
labels := []string{"le"}
|
||||
hasLabels, apiError := r.CheckForLabelsInMetric(ctx, req.MetricName, labels)
|
||||
if apiError != nil {
|
||||
return apiError
|
||||
}
|
||||
if !hasLabels {
|
||||
return &model.ApiError{
|
||||
Typ: model.ErrorBadData,
|
||||
Err: fmt.Errorf("metric '%s' cannot be set as histogram type", req.MetricName),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if req.MetricType == v3.MetricTypeSummary {
|
||||
labels := []string{"quantile"}
|
||||
hasLabels, apiError := r.CheckForLabelsInMetric(ctx, req.MetricName, labels)
|
||||
if apiError != nil {
|
||||
return apiError
|
||||
}
|
||||
if !hasLabels {
|
||||
return &model.ApiError{
|
||||
Typ: model.ErrorBadData,
|
||||
Err: fmt.Errorf("metric '%s' cannot be set as summary type", req.MetricName),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
apiErr := r.DeleteMetricsMetadata(ctx, req.MetricName)
|
||||
if apiErr != nil {
|
||||
return apiErr
|
||||
}
|
||||
insertQuery := fmt.Sprintf(`INSERT INTO %s.%s (metric_name, temporality, is_monotonic, type, description, unit, created_at)
|
||||
VALUES ( ?, ?, ?, ?, ?, ?, ?);`, signozMetricDBName, signozUpdatedMetricsMetadataTable)
|
||||
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
|
||||
err := r.db.Exec(valueCtx, insertQuery, req.MetricName, req.Temporality, req.IsMonotonic, req.MetricType, req.Description, req.Unit, req.CreatedAt.UnixMilli())
|
||||
if err != nil {
|
||||
return &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
err = r.cache.Store(ctx, constants.UpdatedMetricsMetadataCachePrefix+req.MetricName, req, -1)
|
||||
if err != nil {
|
||||
return &model.ApiError{Typ: "CachingErr", Err: err}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) CheckForLabelsInMetric(ctx context.Context, metricName string, labels []string) (bool, *model.ApiError) {
|
||||
if len(labels) == 0 {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
conditions := "metric_name = ?"
|
||||
for range labels {
|
||||
conditions += " AND JSONHas(labels, ?) = 1"
|
||||
}
|
||||
|
||||
query := fmt.Sprintf(`
|
||||
SELECT count(*) > 0 as has_le
|
||||
FROM %s.%s
|
||||
WHERE %s
|
||||
LIMIT 1`, signozMetricDBName, signozTSTableNameV41Day, conditions)
|
||||
|
||||
args := make([]interface{}, 0, len(labels)+1)
|
||||
args = append(args, metricName)
|
||||
for _, label := range labels {
|
||||
args = append(args, label)
|
||||
}
|
||||
|
||||
var hasLE bool
|
||||
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
|
||||
err := r.db.QueryRow(valueCtx, query, args...).Scan(&hasLE)
|
||||
if err != nil {
|
||||
return false, &model.ApiError{
|
||||
Typ: "ClickHouseError",
|
||||
Err: fmt.Errorf("error checking summary labels: %v", err),
|
||||
}
|
||||
}
|
||||
return hasLE, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, metricName string) (*model.UpdateMetricsMetadata, *model.ApiError) {
|
||||
metricsMetadata := new(model.UpdateMetricsMetadata)
|
||||
cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metricName
|
||||
|
||||
// Try to get from cache first
|
||||
retrieveStatus, err := r.cache.Retrieve(ctx, cacheKey, metricsMetadata, true)
|
||||
if err == nil && retrieveStatus == cache.RetrieveStatusHit {
|
||||
return metricsMetadata, nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
zap.L().Info("Error retrieving metrics metadata from cache", zap.String("metric_name", metricName), zap.Error(err))
|
||||
} else {
|
||||
zap.L().Info("Cache miss for metrics metadata", zap.String("metric_name", metricName))
|
||||
}
|
||||
|
||||
// Query from database if cache missed
|
||||
query := fmt.Sprintf(`SELECT metric_name, type, description, temporality, is_monotonic, unit
|
||||
FROM %s.%s
|
||||
WHERE metric_name = ?;`, signozMetricDBName, signozUpdatedMetricsMetadataTable)
|
||||
|
||||
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
|
||||
fmt.Printf("Executing Query: %q\n", query)
|
||||
row := r.db.QueryRow(valueCtx, query, metricName)
|
||||
err = row.Scan(
|
||||
&metricsMetadata.MetricName,
|
||||
&metricsMetadata.MetricType,
|
||||
&metricsMetadata.Description,
|
||||
&metricsMetadata.Temporality,
|
||||
&metricsMetadata.IsMonotonic,
|
||||
&metricsMetadata.Unit,
|
||||
)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil, nil // No data found
|
||||
}
|
||||
return nil, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error querying metrics metadata: %v", err)}
|
||||
}
|
||||
|
||||
// Try caching the result, but don't return error if caching fails
|
||||
if cacheErr := r.cache.Store(ctx, cacheKey, metricsMetadata, -1); cacheErr != nil {
|
||||
zap.L().Error("Failed to store metrics metadata in cache", zap.String("metric_name", metricName), zap.Error(cacheErr))
|
||||
}
|
||||
|
||||
return metricsMetadata, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) PreloadMetricsMetadata(ctx context.Context) []error {
|
||||
var allMetricsMetadata []model.UpdateMetricsMetadata
|
||||
var errorList []error
|
||||
// Fetch all rows from ClickHouse
|
||||
query := fmt.Sprintf(`SELECT metric_name, type, description , temporality, is_monotonic, unit
|
||||
FROM %s.%s;`, signozMetricDBName, signozUpdatedMetricsMetadataTable)
|
||||
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
|
||||
err := r.db.Select(valueCtx, &allMetricsMetadata, query)
|
||||
if err != nil {
|
||||
errorList = append(errorList, err)
|
||||
return errorList
|
||||
}
|
||||
for _, m := range allMetricsMetadata {
|
||||
err := r.cache.Store(ctx, constants.UpdatedMetricsMetadataCachePrefix+m.MetricName, &m, -1)
|
||||
if err != nil {
|
||||
errorList = append(errorList, err)
|
||||
}
|
||||
}
|
||||
|
||||
return errorList
|
||||
}
|
||||
|
@ -223,7 +223,6 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
|
||||
statefulsetsRepo := inframetrics.NewStatefulSetsRepo(opts.Reader, querierv2)
|
||||
jobsRepo := inframetrics.NewJobsRepo(opts.Reader, querierv2)
|
||||
pvcsRepo := inframetrics.NewPvcsRepo(opts.Reader, querierv2)
|
||||
//explorerCache := metricsexplorer.NewExplorerCache(metricsexplorer.WithCache(opts.Cache))
|
||||
summaryService := metricsexplorer.NewSummaryService(opts.Reader, opts.RuleManager)
|
||||
|
||||
aH := &APIHandler{
|
||||
@ -639,6 +638,9 @@ func (ah *APIHandler) MetricExplorerRoutes(router *mux.Router, am *AuthMiddlewar
|
||||
router.HandleFunc("/api/v1/metrics/inspect",
|
||||
am.ViewAccess(ah.GetInspectMetricsData)).
|
||||
Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/metrics/{metric_name}/metadata",
|
||||
am.ViewAccess(ah.UpdateMetricsMetadata)).
|
||||
Methods(http.MethodPost)
|
||||
}
|
||||
|
||||
func Intersection(a, b []int) (c []int) {
|
||||
|
@ -3,9 +3,12 @@ package metricsexplorer
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/gorilla/mux"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
@ -82,8 +85,71 @@ func ParseInspectMetricsParams(r *http.Request) (*metrics_explorer.InspectMetric
|
||||
if err := json.NewDecoder(r.Body).Decode(&inspectMetricParams); err != nil {
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)}
|
||||
}
|
||||
if inspectMetricParams.End-inspectMetricParams.Start > 1800000 { // half hour only
|
||||
if inspectMetricParams.End-inspectMetricParams.Start > constants.InspectMetricsMaxTimeDiff { // half hour only
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("time duration shouldn't be more than 30 mins")}
|
||||
}
|
||||
return &inspectMetricParams, nil
|
||||
}
|
||||
|
||||
func ParseUpdateMetricsMetadataParams(r *http.Request) (*metrics_explorer.UpdateMetricsMetadataRequest, *model.ApiError) {
|
||||
var updateMetricsMetadataReq metrics_explorer.UpdateMetricsMetadataRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&updateMetricsMetadataReq); err != nil {
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)}
|
||||
}
|
||||
updateMetricsMetadataReq.MetricName = mux.Vars(r)["metric_name"]
|
||||
|
||||
switch updateMetricsMetadataReq.MetricType {
|
||||
case v3.MetricTypeSum:
|
||||
if updateMetricsMetadataReq.Temporality == "" {
|
||||
return nil, &model.ApiError{
|
||||
Typ: model.ErrorBadData,
|
||||
Err: fmt.Errorf("temporality is required when metric type is Sum"),
|
||||
}
|
||||
}
|
||||
|
||||
if updateMetricsMetadataReq.Temporality != v3.Cumulative && updateMetricsMetadataReq.Temporality != v3.Delta {
|
||||
return nil, &model.ApiError{
|
||||
Typ: model.ErrorBadData,
|
||||
Err: fmt.Errorf("invalid value for temporality"),
|
||||
}
|
||||
}
|
||||
case v3.MetricTypeHistogram:
|
||||
if updateMetricsMetadataReq.Temporality == "" {
|
||||
return nil, &model.ApiError{
|
||||
Typ: model.ErrorBadData,
|
||||
Err: fmt.Errorf("temporality is required when metric type is Histogram"),
|
||||
}
|
||||
}
|
||||
if updateMetricsMetadataReq.Temporality != v3.Cumulative && updateMetricsMetadataReq.Temporality != v3.Delta {
|
||||
return nil, &model.ApiError{
|
||||
Typ: model.ErrorBadData,
|
||||
Err: fmt.Errorf("invalid value for temporality"),
|
||||
}
|
||||
}
|
||||
case v3.MetricTypeExponentialHistogram:
|
||||
if updateMetricsMetadataReq.Temporality == "" {
|
||||
return nil, &model.ApiError{
|
||||
Typ: model.ErrorBadData,
|
||||
Err: fmt.Errorf("temporality is required when metric type is exponential histogram"),
|
||||
}
|
||||
}
|
||||
if updateMetricsMetadataReq.Temporality != v3.Cumulative && updateMetricsMetadataReq.Temporality != v3.Delta {
|
||||
return nil, &model.ApiError{
|
||||
Typ: model.ErrorBadData,
|
||||
Err: fmt.Errorf("invalid value for temporality"),
|
||||
}
|
||||
}
|
||||
|
||||
case v3.MetricTypeGauge:
|
||||
updateMetricsMetadataReq.Temporality = v3.Unspecified
|
||||
case v3.MetricTypeSummary:
|
||||
updateMetricsMetadataReq.Temporality = v3.Cumulative
|
||||
|
||||
default:
|
||||
return nil, &model.ApiError{
|
||||
Typ: model.ErrorBadData,
|
||||
Err: fmt.Errorf("invalid metric type"),
|
||||
}
|
||||
}
|
||||
return &updateMetricsMetadataReq, nil
|
||||
}
|
||||
|
@ -541,3 +541,23 @@ func (receiver *SummaryService) GetInspectMetrics(ctx context.Context, params *m
|
||||
|
||||
return baseResponse, nil
|
||||
}
|
||||
|
||||
func (receiver *SummaryService) UpdateMetricsMetadata(ctx context.Context, params *metrics_explorer.UpdateMetricsMetadataRequest) *model.ApiError {
|
||||
if params.MetricType == v3.MetricTypeSum && !params.IsMonotonic && params.Temporality == v3.Cumulative {
|
||||
params.MetricType = v3.MetricTypeGauge
|
||||
}
|
||||
metadata := model.UpdateMetricsMetadata{
|
||||
MetricName: params.MetricName,
|
||||
MetricType: params.MetricType,
|
||||
Temporality: params.Temporality,
|
||||
Unit: params.Unit,
|
||||
Description: params.Description,
|
||||
IsMonotonic: params.IsMonotonic,
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
apiError := receiver.reader.UpdateMetricsMetadata(ctx, &metadata)
|
||||
if apiError != nil {
|
||||
return apiError
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -228,6 +228,14 @@ func (q *querier) runBuilderQuery(
|
||||
return
|
||||
}
|
||||
|
||||
if builderQuery.DataSource == v3.DataSourceMetrics && !q.testingMode {
|
||||
metadata, apiError := q.reader.GetUpdatedMetricsMetadata(ctx, builderQuery.AggregateAttribute.Key)
|
||||
if apiError == nil && metadata != nil {
|
||||
builderQuery.AggregateAttribute.Type = v3.AttributeKeyType(metadata.MetricType)
|
||||
builderQuery.Temporality = metadata.Temporality
|
||||
}
|
||||
}
|
||||
|
||||
// What is happening here?
|
||||
// We are only caching the graph panel queries. A non-existant cache key means that the query is not cached.
|
||||
// If the query is not cached, we execute the query and return the result without caching it.
|
||||
|
@ -229,6 +229,14 @@ func (q *querier) runBuilderQuery(
|
||||
return
|
||||
}
|
||||
|
||||
if builderQuery.DataSource == v3.DataSourceMetrics && !q.testingMode {
|
||||
metadata, apiError := q.reader.GetUpdatedMetricsMetadata(ctx, builderQuery.AggregateAttribute.Key)
|
||||
if apiError == nil && metadata != nil {
|
||||
builderQuery.AggregateAttribute.Type = v3.AttributeKeyType(metadata.MetricType)
|
||||
builderQuery.Temporality = metadata.Temporality
|
||||
}
|
||||
}
|
||||
|
||||
// What is happening here?
|
||||
// We are only caching the graph panel queries. A non-existant cache key means that the query is not cached.
|
||||
// If the query is not cached, we execute the query and return the result without caching it.
|
||||
|
@ -254,6 +254,11 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
&opAmpModel.AllAgents, agentConfMgr,
|
||||
)
|
||||
|
||||
errorList := reader.PreloadMetricsMetadata(context.Background())
|
||||
for _, er := range errorList {
|
||||
zap.L().Error("preload metrics updated metadata failed", zap.Error(er))
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
|
@ -56,7 +56,7 @@ func (aH *APIHandler) GetMetricsDetails(w http.ResponseWriter, r *http.Request)
|
||||
ctx := r.Context()
|
||||
metricsDetail, apiError := aH.SummaryService.GetMetricsSummary(ctx, metricName)
|
||||
if apiError != nil {
|
||||
zap.L().Error("error parsing metric query range params", zap.Error(apiError.Err))
|
||||
zap.L().Error("error getting metrics summary error", zap.Error(apiError.Err))
|
||||
RespondError(w, apiError, nil)
|
||||
return
|
||||
}
|
||||
@ -89,7 +89,7 @@ func (aH *APIHandler) GetTreeMap(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
params, apiError := explorer.ParseTreeMapMetricsParams(r)
|
||||
if apiError != nil {
|
||||
zap.L().Error("error parsing metric query range params", zap.Error(apiError.Err))
|
||||
zap.L().Error("error parsing heatmap metric params", zap.Error(apiError.Err))
|
||||
RespondError(w, apiError, nil)
|
||||
return
|
||||
}
|
||||
@ -109,7 +109,7 @@ func (aH *APIHandler) GetRelatedMetrics(w http.ResponseWriter, r *http.Request)
|
||||
ctx := r.Context()
|
||||
params, apiError := explorer.ParseRelatedMetricsParams(r)
|
||||
if apiError != nil {
|
||||
zap.L().Error("error parsing metric query range params", zap.Error(apiError.Err))
|
||||
zap.L().Error("error parsing related metric params", zap.Error(apiError.Err))
|
||||
RespondError(w, apiError, nil)
|
||||
return
|
||||
}
|
||||
@ -129,7 +129,7 @@ func (aH *APIHandler) GetInspectMetricsData(w http.ResponseWriter, r *http.Reque
|
||||
ctx := r.Context()
|
||||
params, apiError := explorer.ParseInspectMetricsParams(r)
|
||||
if apiError != nil {
|
||||
zap.L().Error("error parsing metric query range params", zap.Error(apiError.Err))
|
||||
zap.L().Error("error parsing inspect metric params", zap.Error(apiError.Err))
|
||||
RespondError(w, apiError, nil)
|
||||
return
|
||||
}
|
||||
@ -142,3 +142,23 @@ func (aH *APIHandler) GetInspectMetricsData(w http.ResponseWriter, r *http.Reque
|
||||
aH.Respond(w, result)
|
||||
|
||||
}
|
||||
|
||||
func (aH *APIHandler) UpdateMetricsMetadata(w http.ResponseWriter, r *http.Request) {
|
||||
bodyBytes, _ := io.ReadAll(r.Body)
|
||||
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
|
||||
ctx := r.Context()
|
||||
params, apiError := explorer.ParseUpdateMetricsMetadataParams(r)
|
||||
if apiError != nil {
|
||||
zap.L().Error("error parsing update metrics metadata params", zap.Error(apiError.Err))
|
||||
RespondError(w, apiError, nil)
|
||||
return
|
||||
}
|
||||
apiError = aH.SummaryService.UpdateMetricsMetadata(ctx, params)
|
||||
if apiError != nil {
|
||||
zap.L().Error("error updating metrics metadata", zap.Error(apiError.Err))
|
||||
RespondError(w, apiError, nil)
|
||||
return
|
||||
}
|
||||
aH.Respond(w, nil)
|
||||
|
||||
}
|
||||
|
@ -76,6 +76,7 @@ var TimestampSortFeature = GetOrDefaultEnv("TIMESTAMP_SORT_FEATURE", "true")
|
||||
var PreferRPMFeature = GetOrDefaultEnv("PREFER_RPM_FEATURE", "false")
|
||||
|
||||
var MetricsExplorerClickhouseThreads = GetOrDefaultEnvInt("METRICS_EXPLORER_CLICKHOUSE_THREADS", 8)
|
||||
var UpdatedMetricsMetadataCachePrefix = GetOrDefaultEnv("METRICS_UPDATED_METADATA_CACHE_KEY", "UPDATED_METRICS_METADATA")
|
||||
|
||||
// TODO(srikanthccv): remove after backfilling is done
|
||||
func UseMetricsPreAggregation() bool {
|
||||
@ -725,3 +726,5 @@ var MaterializedDataTypeMap = map[string]string{
|
||||
"int64": "number",
|
||||
"float64": "number",
|
||||
}
|
||||
|
||||
const InspectMetricsMaxTimeDiff = 1800000
|
||||
|
@ -139,6 +139,12 @@ type Reader interface {
|
||||
GetMetricsAllResourceAttributes(ctx context.Context, start int64, end int64) (map[string]uint64, *model.ApiError)
|
||||
GetInspectMetricsFingerprints(ctx context.Context, attributes []string, req *metrics_explorer.InspectMetricsRequest) ([]string, *model.ApiError)
|
||||
GetInspectMetrics(ctx context.Context, req *metrics_explorer.InspectMetricsRequest, fingerprints []string) (*metrics_explorer.InspectMetricsResponse, *model.ApiError)
|
||||
|
||||
DeleteMetricsMetadata(ctx context.Context, metricName string) *model.ApiError
|
||||
UpdateMetricsMetadata(ctx context.Context, req *model.UpdateMetricsMetadata) *model.ApiError
|
||||
GetUpdatedMetricsMetadata(ctx context.Context, metricName string) (*model.UpdateMetricsMetadata, *model.ApiError)
|
||||
|
||||
CheckForLabelsInMetric(ctx context.Context, metricName string, labels []string) (bool, *model.ApiError)
|
||||
}
|
||||
|
||||
type Querier interface {
|
||||
|
@ -160,3 +160,12 @@ type InspectMetricsRequest struct {
|
||||
type InspectMetricsResponse struct {
|
||||
Series *[]v3.Series `json:"series,omitempty"`
|
||||
}
|
||||
|
||||
type UpdateMetricsMetadataRequest struct {
|
||||
MetricName string `json:"metricName"`
|
||||
MetricType v3.MetricType `json:"metricType"`
|
||||
Description string `json:"description"`
|
||||
Unit string `json:"unit"`
|
||||
Temporality v3.Temporality `json:"temporality"`
|
||||
IsMonotonic bool `json:"isMonotonic"`
|
||||
}
|
||||
|
24
pkg/query-service/model/updatedMetricsMetadata.go
Normal file
24
pkg/query-service/model/updatedMetricsMetadata.go
Normal file
@ -0,0 +1,24 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"time"
|
||||
)
|
||||
|
||||
type UpdateMetricsMetadata struct {
|
||||
MetricName string `json:"metricName" ch:"metric_name"`
|
||||
MetricType v3.MetricType `json:"metricType" ch:"type"`
|
||||
Description string `json:"description" ch:"description"`
|
||||
Unit string `json:"unit" ch:"unit"`
|
||||
Temporality v3.Temporality `json:"temporality" ch:"temporality"`
|
||||
IsMonotonic bool `json:"is_monotonic" ch:"is_monotonic"`
|
||||
CreatedAt time.Time `json:"created_at" ch:"created_at"`
|
||||
}
|
||||
|
||||
func (c *UpdateMetricsMetadata) MarshalBinary() (data []byte, err error) {
|
||||
return json.Marshal(c)
|
||||
}
|
||||
func (c *UpdateMetricsMetadata) UnmarshalBinary(data []byte) error {
|
||||
return json.Unmarshal(data, c)
|
||||
}
|
@ -2,6 +2,10 @@ package rules
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"go.signoz.io/signoz/pkg/cache"
|
||||
"go.signoz.io/signoz/pkg/cache/memorycache"
|
||||
"go.signoz.io/signoz/pkg/factory/factorytest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
@ -1224,6 +1228,9 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
|
||||
for idx, c := range cases {
|
||||
rows := cmock.NewRows(cols, c.values)
|
||||
|
||||
cacheCols := make([]cmock.ColumnType, 0)
|
||||
mock.ExpectQueryRow(".*").WillReturnRow(cmock.NewRow(cacheCols, nil)).WillReturnError(sql.ErrNoRows)
|
||||
|
||||
// We are testing the eval logic after the query is run
|
||||
// so we don't care about the query string here
|
||||
queryString := "SELECT any"
|
||||
@ -1241,8 +1248,8 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
|
||||
}
|
||||
|
||||
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil)
|
||||
|
||||
readerCache, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{TTL: DefaultFrequency}})
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), readerCache)
|
||||
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
|
||||
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
||||
"signoz_calls_total": {
|
||||
@ -1324,6 +1331,9 @@ func TestThresholdRuleNoData(t *testing.T) {
|
||||
for idx, c := range cases {
|
||||
rows := cmock.NewRows(cols, c.values)
|
||||
|
||||
cacheCols := make([]cmock.ColumnType, 0)
|
||||
mock.ExpectQueryRow(".*").WillReturnRow(cmock.NewRow(cacheCols, nil)).WillReturnError(sql.ErrNoRows)
|
||||
|
||||
// We are testing the eval logic after the query is run
|
||||
// so we don't care about the query string here
|
||||
queryString := "SELECT any"
|
||||
@ -1338,9 +1348,9 @@ func TestThresholdRuleNoData(t *testing.T) {
|
||||
"description": "This alert is fired when the defined metric (current value: {{$value}}) crosses the threshold ({{$threshold}})",
|
||||
"summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}",
|
||||
}
|
||||
|
||||
readerCache, err := memorycache.New(context.Background(), factorytest.NewSettings(), cache.Config{Provider: "memory", Memory: cache.Memory{TTL: DefaultFrequency}})
|
||||
options := clickhouseReader.NewOptions("", "", "archiveNamespace")
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil)
|
||||
reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), readerCache)
|
||||
|
||||
rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true)
|
||||
rule.TemporalityMap = map[string]map[v3.Temporality]bool{
|
||||
|
Loading…
x
Reference in New Issue
Block a user