From 1f04b7789724f90b4508245bdf1f92459c565df5 Mon Sep 17 00:00:00 2001 From: srikanthccv Date: Fri, 30 May 2025 22:13:58 +0530 Subject: [PATCH] chore: some changes --- pkg/telemetrymetrics/field_mapper.go | 2 + pkg/telemetrymetrics/internal/cumulative.go | 1 - pkg/telemetrymetrics/internal/delta.go | 1 - pkg/telemetrymetrics/statement_builder.go | 69 ++++++++++++--- pkg/telemetrymetrics/stmt_builder_test.go | 88 ++++++++++++++++--- pkg/telemetrymetrics/test_data.go | 30 ------- pkg/telemetrymetrics/testdata/keys_map.json | 34 +++++++ pkg/types/metrictypes/metrictypes.go | 16 ++-- .../querybuildertypesv5/builder_elements.go | 2 +- .../telemetrytypestest/load_keys.go | 37 ++++++++ 10 files changed, 210 insertions(+), 70 deletions(-) delete mode 100644 pkg/telemetrymetrics/internal/cumulative.go delete mode 100644 pkg/telemetrymetrics/internal/delta.go delete mode 100644 pkg/telemetrymetrics/test_data.go create mode 100644 pkg/telemetrymetrics/testdata/keys_map.json create mode 100644 pkg/types/telemetrytypes/telemetrytypestest/load_keys.go diff --git a/pkg/telemetrymetrics/field_mapper.go b/pkg/telemetrymetrics/field_mapper.go index a7f9b67c09..b80706da48 100644 --- a/pkg/telemetrymetrics/field_mapper.go +++ b/pkg/telemetrymetrics/field_mapper.go @@ -54,6 +54,8 @@ func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.Telemetry case telemetrytypes.FieldContextUnspecified: col, ok := timeSeriesV4Columns[key.Name] if !ok { + // if nothing is found, return labels column + // as we keep all the labels in the labels column return timeSeriesV4Columns["labels"], nil } return col, nil diff --git a/pkg/telemetrymetrics/internal/cumulative.go b/pkg/telemetrymetrics/internal/cumulative.go deleted file mode 100644 index 5bf0569ce8..0000000000 --- a/pkg/telemetrymetrics/internal/cumulative.go +++ /dev/null @@ -1 +0,0 @@ -package internal diff --git a/pkg/telemetrymetrics/internal/delta.go b/pkg/telemetrymetrics/internal/delta.go deleted file mode 100644 index 5bf0569ce8..0000000000 --- a/pkg/telemetrymetrics/internal/delta.go +++ /dev/null @@ -1 +0,0 @@ -package internal diff --git a/pkg/telemetrymetrics/statement_builder.go b/pkg/telemetrymetrics/statement_builder.go index 0a274bb9d9..d57d08e242 100644 --- a/pkg/telemetrymetrics/statement_builder.go +++ b/pkg/telemetrymetrics/statement_builder.go @@ -49,11 +49,13 @@ func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) whereClauseSelectors := querybuilder.QueryStringToKeysSelectors(query.Filter.Expression) keySelectors = append(keySelectors, whereClauseSelectors...) } + for idx := range query.GroupBy { groupBy := query.GroupBy[idx] selectors := querybuilder.QueryStringToKeysSelectors(groupBy.TelemetryFieldKey.Name) keySelectors = append(keySelectors, selectors...) } + for idx := range query.Order { keySelectors = append(keySelectors, &telemetrytypes.FieldKeySelector{ Name: query.Order[idx].Key.Name, @@ -62,6 +64,7 @@ func getKeySelectors(query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) FieldDataType: query.Order[idx].Key.FieldDataType, }) } + for idx := range keySelectors { keySelectors[idx].Signal = telemetrytypes.SignalMetrics } @@ -85,6 +88,27 @@ func (b *metricQueryStatementBuilder) Build( } // Fast‑path (no fingerprint grouping) +// canShortCircuitDelta returns true if we can use the optimized query +// for the given query +// This is used to avoid the group by fingerprint thus improving the performance +// for certain queries +// cases where we can short circuit: +// 1. time aggregation = (rate|increase) and space aggregation = sum +// - rate = sum(value)/step, increase = sum(value) - sum of sums is same as sum of all values +// +// 2. time aggregation = sum and space aggregation = sum +// - sum of sums is same as sum of all values +// +// 3. time aggregation = min and space aggregation = min +// - min of mins is same as min of all values +// +// 4. time aggregation = max and space aggregation = max +// - max of maxs is same as max of all values +// +// 5. special case exphist, there is no need for per series/fingerprint aggregation +// we can directly use the quantilesDDMerge function +// +// all of this is true only for delta metrics func (b *metricQueryStatementBuilder) canShortCircuitDelta(q qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]) bool { if q.Aggregations[0].Temporality != metrictypes.Delta { return false @@ -126,8 +150,9 @@ func (b *metricQueryStatementBuilder) buildPipelineStatement( origTimeAgg := query.Aggregations[0].TimeAggregation origGroupBy := query.GroupBy - if query.Aggregations[0].SpaceAggregation.IsPercentile() { - // 1. add le in the group by if doesn't exist + if query.Aggregations[0].SpaceAggregation.IsPercentile() && + query.Aggregations[0].Type != metrictypes.ExpHistogramType { + // add le in the group by if doesn't exist leExists := false for _, g := range query.GroupBy { if g.TelemetryFieldKey.Name == "le" { @@ -135,18 +160,21 @@ func (b *metricQueryStatementBuilder) buildPipelineStatement( break } } + + // we need to add le in the group by if it doesn't exist if !leExists { query.GroupBy = append(query.GroupBy, qbtypes.GroupByKey{ TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: "le"}, }) } - // 2. make the time aggregation rate and space aggregation sum + // make the time aggregation rate and space aggregation sum query.Aggregations[0].TimeAggregation = metrictypes.TimeAggregationSum query.Aggregations[0].SpaceAggregation = metrictypes.SpaceAggregationSum } - // 1. time_series_cte + // time_series_cte + // this is applicable for all the queries if frag, args, err := b.buildTimeSeriesCTE(ctx, start, end, query, keys); err != nil { return nil, err } else if frag != "" { @@ -155,7 +183,7 @@ func (b *metricQueryStatementBuilder) buildPipelineStatement( } if b.canShortCircuitDelta(query) { - // 2. spatial_aggregation_cte directly + // spatial_aggregation_cte directly for certain delta queries if frag, args, err := b.buildTemporalAggDeltaFastPath(start, end, query); err != nil { return nil, err } else if frag != "" { @@ -163,7 +191,7 @@ func (b *metricQueryStatementBuilder) buildPipelineStatement( cteArgs = append(cteArgs, args) } } else { - // 2. temporal_aggregation_cte + // temporal_aggregation_cte if frag, args, err := b.buildTemporalAggregationCTE(ctx, start, end, query, keys); err != nil { return nil, err } else if frag != "" { @@ -171,7 +199,7 @@ func (b *metricQueryStatementBuilder) buildPipelineStatement( cteArgs = append(cteArgs, args) } - // 3. spatial_aggregation_cte + // spatial_aggregation_cte if frag, args, err := b.buildSpatialAggregationCTE(ctx, start, end, query, keys); err != nil { return nil, err } else if frag != "" { @@ -185,7 +213,7 @@ func (b *metricQueryStatementBuilder) buildPipelineStatement( query.Aggregations[0].TimeAggregation = origTimeAgg query.GroupBy = origGroupBy - // 4. final SELECT + // final SELECT return b.buildFinalSelect(cteFragments, cteArgs, query) } @@ -205,12 +233,16 @@ func (b *metricQueryStatementBuilder) buildTemporalAggDeltaFastPath( sb.SelectMore(fmt.Sprintf("`%s`", g.TelemetryFieldKey.Name)) } - aggCol := AggregationColumnForSamplesTable(start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints) + aggCol := AggregationColumnForSamplesTable( + start, end, query.Aggregations[0].Type, query.Aggregations[0].Temporality, + query.Aggregations[0].TimeAggregation, query.Aggregations[0].TableHints, + ) if query.Aggregations[0].TimeAggregation == metrictypes.TimeAggregationRate { aggCol = fmt.Sprintf("%s/%d", aggCol, stepSec) } - if query.Aggregations[0].SpaceAggregation.IsPercentile() { + if query.Aggregations[0].SpaceAggregation.IsPercentile() && + query.Aggregations[0].Type == metrictypes.ExpHistogramType { aggCol = fmt.Sprintf("quantilesDDMerge(0.01, %f)(sketch)[1]", query.Aggregations[0].SpaceAggregation.Percentile()) } @@ -264,9 +296,17 @@ func (b *metricQueryStatementBuilder) buildTimeSeriesCTE( sb.In("metric_name", query.Aggregations[0].MetricName), sb.GTE("unix_milli", start), sb.LTE("unix_milli", end), - sb.ILike("temporality", query.Aggregations[0].Temporality.StringValue()), - sb.EQ("__normalized", false), // TODO configurable ) + + if query.Aggregations[0].Temporality != metrictypes.Unspecified { + sb.Where(sb.ILike("temporality", query.Aggregations[0].Temporality.StringValue())) + } + + // TODO configurable if we don't rollout the new un-normalized metrics + sb.Where( + sb.EQ("__normalized", false), + ) + if filterWhere != nil { sb.AddWhereClause(filterWhere) } @@ -284,12 +324,13 @@ func (b *metricQueryStatementBuilder) buildTemporalAggregationCTE( _ map[string][]*telemetrytypes.TelemetryFieldKey, ) (string, []any, error) { if query.Aggregations[0].Temporality == metrictypes.Delta { - return b.buildTemporalAggDelta(start, end, query) + return b.buildTemporalAggDelta(ctx, start, end, query) } return b.buildTemporalAggCumulativeOrUnspecified(ctx, start, end, query) } func (b *metricQueryStatementBuilder) buildTemporalAggDelta( + _ context.Context, start, end uint64, query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], ) (string, []any, error) { @@ -329,7 +370,7 @@ func (b *metricQueryStatementBuilder) buildTemporalAggDelta( } func (b *metricQueryStatementBuilder) buildTemporalAggCumulativeOrUnspecified( - ctx context.Context, + _ context.Context, start, end uint64, query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation], ) (string, []any, error) { diff --git a/pkg/telemetrymetrics/stmt_builder_test.go b/pkg/telemetrymetrics/stmt_builder_test.go index bae8d9765b..d6122c5ae8 100644 --- a/pkg/telemetrymetrics/stmt_builder_test.go +++ b/pkg/telemetrymetrics/stmt_builder_test.go @@ -2,7 +2,6 @@ package telemetrymetrics import ( "context" - "fmt" "log/slog" "testing" "time" @@ -24,7 +23,7 @@ func TestStatementBuilder(t *testing.T) { expectedErr error }{ { - name: "test", + name: "test_cumulative_rate_sum", requestType: qbtypes.RequestTypeTimeSeries, query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ Signal: telemetrytypes.SignalMetrics, @@ -51,13 +50,13 @@ func TestStatementBuilder(t *testing.T) { }, }, expected: qbtypes.Statement{ - Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY ALL ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) IN (SELECT `service.name` FROM __limit_cte) GROUP BY ALL", - Args: []any{"cartservice", "%service.name%", "%service.name%cartservice%", uint64(1747945619), uint64(1747983448), true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)}, + Query: "WITH __time_series_cte AS (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY ALL), __temporal_aggregation_cte AS (SELECT ts, `service.name`, If((per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) < 0, per_series_value, (per_series_value - lagInFrame(per_series_value, 1, 0) OVER rate_window) / (ts - lagInFrame(ts, 1, toDateTime(fromUnixTimestamp64Milli(1747947419000))) OVER rate_window)) AS per_series_value FROM (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, max(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN __time_series_cte ON points.fingerprint = __time_series_cte.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL ORDER BY fingerprint, ts) WINDOW rate_window AS (PARTITION BY fingerprint ORDER BY fingerprint, ts)), __spatial_aggregation_cte AS (SELECT ts, `service.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ALL) SELECT * FROM __spatial_aggregation_cte", + Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983448000), "cumulative", false, "cartservice", "signoz_calls_total", uint64(1747947419000), uint64(1747983448000), 0}, }, expectedErr: nil, }, { - name: "test", + name: "test_delta_rate_sum", requestType: qbtypes.RequestTypeTimeSeries, query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ Signal: telemetrytypes.SignalMetrics, @@ -84,8 +83,73 @@ func TestStatementBuilder(t *testing.T) { }, }, expected: qbtypes.Statement{ - Query: "WITH __resource_filter AS (SELECT fingerprint FROM signoz_logs.distributed_logs_v2_resource WHERE (simpleJSONExtractString(labels, 'service.name') = ? AND labels LIKE ? AND labels LIKE ?) AND seen_at_ts_bucket_start >= ? AND seen_at_ts_bucket_start <= ?), __limit_cte AS (SELECT toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? GROUP BY ALL ORDER BY __result_0 DESC LIMIT ?) SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 30 SECOND) AS ts, toString(multiIf(mapContains(resources_string, 'service.name') = ?, resources_string['service.name'], NULL)) AS `service.name`, count() AS __result_0 FROM signoz_logs.distributed_logs_v2 WHERE resource_fingerprint IN (SELECT fingerprint FROM __resource_filter) AND timestamp >= ? AND timestamp <= ? AND ts_bucket_start >= ? AND ts_bucket_start <= ? AND (`service.name`) IN (SELECT `service.name` FROM __limit_cte) GROUP BY ALL", - Args: []any{"cartservice", "%service.name%", "%service.name%cartservice%", uint64(1747945619), uint64(1747983448), true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448), 10, true, "1747947419000000000", "1747983448000000000", uint64(1747945619), uint64(1747983448)}, + Query: "WITH __time_series_cte AS (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY ALL), __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, sum(value)/30 AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN __time_series_cte ON points.fingerprint = __time_series_cte.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL) SELECT * FROM __spatial_aggregation_cte", + Args: []any{"signoz_calls_total", uint64(1747936800000), uint64(1747983448000), "delta", false, "cartservice", "signoz_calls_total", uint64(1747947419000), uint64(1747983448000)}, + }, + expectedErr: nil, + }, + { + name: "test_histogram_percentile", + requestType: qbtypes.RequestTypeTimeSeries, + query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + Signal: telemetrytypes.SignalMetrics, + StepInterval: qbtypes.Step{Duration: 30 * time.Second}, + Aggregations: []qbtypes.MetricAggregation{ + { + MetricName: "signoz_latency", + Type: metrictypes.HistogramType, + Temporality: metrictypes.Delta, + SpaceAggregation: metrictypes.SpaceAggregationPercentile95, + }, + }, + Filter: &qbtypes.Filter{ + Expression: "service.name = 'cartservice'", + }, + Limit: 10, + GroupBy: []qbtypes.GroupByKey{ + { + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + Name: "service.name", + }, + }, + }, + }, + expected: qbtypes.Statement{ + Query: "WITH __time_series_cte AS (SELECT fingerprint, JSONExtractString(labels, 'service.name') AS `service.name`, JSONExtractString(labels, 'le') AS `le` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND LOWER(temporality) LIKE LOWER(?) AND __normalized = ? AND JSONExtractString(labels, 'service.name') = ? GROUP BY ALL), __spatial_aggregation_cte AS (SELECT toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `service.name`, `le`, sum(value) AS value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN __time_series_cte ON points.fingerprint = __time_series_cte.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL) SELECT ts, `service.name`, histogramQuantile(arrayMap(x -> toFloat64(x), groupArray(le)), groupArray(value), 0.950) AS value FROM __spatial_aggregation_cte GROUP BY ALL", + Args: []any{"signoz_latency", uint64(1747936800000), uint64(1747983448000), "delta", false, "cartservice", "signoz_latency", uint64(1747947419000), uint64(1747983448000)}, + }, + expectedErr: nil, + }, + { + name: "test_gauge_avg_sum", + requestType: qbtypes.RequestTypeTimeSeries, + query: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{ + Signal: telemetrytypes.SignalMetrics, + StepInterval: qbtypes.Step{Duration: 30 * time.Second}, + Aggregations: []qbtypes.MetricAggregation{ + { + MetricName: "system.memory.usage", + Type: metrictypes.GaugeType, + Temporality: metrictypes.Unspecified, + TimeAggregation: metrictypes.TimeAggregationAvg, + SpaceAggregation: metrictypes.SpaceAggregationSum, + }, + }, + Filter: &qbtypes.Filter{ + Expression: "host.name = 'big-data-node-1'", + }, + Limit: 10, + GroupBy: []qbtypes.GroupByKey{ + { + TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{ + Name: "host.name", + }, + }, + }, + }, + expected: qbtypes.Statement{ + Query: "WITH __time_series_cte AS (SELECT fingerprint, JSONExtractString(labels, 'host.name') AS `host.name` FROM signoz_metrics.time_series_v4_6hrs WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli <= ? AND __normalized = ? AND JSONExtractString(labels, 'host.name') = ? GROUP BY ALL), __temporal_aggregation_cte AS (SELECT fingerprint, toStartOfInterval(toDateTime(intDiv(unix_milli, 1000)), toIntervalSecond(30)) AS ts, `host.name`, avg(value) AS per_series_value FROM signoz_metrics.distributed_samples_v4 AS points INNER JOIN __time_series_cte ON points.fingerprint = __time_series_cte.fingerprint WHERE metric_name IN (?) AND unix_milli >= ? AND unix_milli < ? GROUP BY ALL ORDER BY fingerprint, ts), __spatial_aggregation_cte AS (SELECT ts, `host.name`, sum(per_series_value) AS value FROM __temporal_aggregation_cte WHERE isNaN(per_series_value) = ? GROUP BY ALL) SELECT * FROM __spatial_aggregation_cte", + Args: []any{"system.memory.usage", uint64(1747936800000), uint64(1747983448000), false, "big-data-node-1", "system.memory.usage", uint64(1747947419000), uint64(1747983448000), 0}, }, expectedErr: nil, }, @@ -94,7 +158,11 @@ func TestStatementBuilder(t *testing.T) { fm := NewFieldMapper() cb := NewConditionBuilder(fm) mockMetadataStore := telemetrytypestest.NewMockMetadataStore() - mockMetadataStore.KeysMap = buildCompleteFieldKeyMap() + keys, err := telemetrytypestest.LoadFieldKeysFromJSON("testdata/keys_map.json") + if err != nil { + t.Fatalf("failed to load field keys: %v", err) + } + mockMetadataStore.KeysMap = keys aggExprRewriter := querybuilder.NewAggExprRewriter(nil, fm, cb, "", nil) @@ -115,10 +183,6 @@ func TestStatementBuilder(t *testing.T) { require.Error(t, err) require.Contains(t, err.Error(), c.expectedErr.Error()) } else { - - fmt.Println(q.Query) - fmt.Println(q.Args) - require.NoError(t, err) require.Equal(t, c.expected.Query, q.Query) require.Equal(t, c.expected.Args, q.Args) diff --git a/pkg/telemetrymetrics/test_data.go b/pkg/telemetrymetrics/test_data.go deleted file mode 100644 index dfc9e7ecb1..0000000000 --- a/pkg/telemetrymetrics/test_data.go +++ /dev/null @@ -1,30 +0,0 @@ -package telemetrymetrics - -import ( - "github.com/SigNoz/signoz/pkg/types/telemetrytypes" -) - -func buildCompleteFieldKeyMap() map[string][]*telemetrytypes.TelemetryFieldKey { - keysMap := map[string][]*telemetrytypes.TelemetryFieldKey{ - "service.name": { - { - Name: "service.name", - FieldContext: telemetrytypes.FieldContextResource, - FieldDataType: telemetrytypes.FieldDataTypeString, - }, - }, - "http.request.method": { - { - Name: "http.request.method", - FieldContext: telemetrytypes.FieldContextAttribute, - FieldDataType: telemetrytypes.FieldDataTypeString, - }, - }, - } - for _, keys := range keysMap { - for _, key := range keys { - key.Signal = telemetrytypes.SignalMetrics - } - } - return keysMap -} diff --git a/pkg/telemetrymetrics/testdata/keys_map.json b/pkg/telemetrymetrics/testdata/keys_map.json new file mode 100644 index 0000000000..d1c27a3bf4 --- /dev/null +++ b/pkg/telemetrymetrics/testdata/keys_map.json @@ -0,0 +1,34 @@ +{ + "service.name": [ + { + "name": "service.name", + "fieldContext": "resource", + "fieldDataType": "string", + "signal": "metrics" + } + ], + "http.request.method": [ + { + "name": "http.request.method", + "fieldContext": "attribute", + "fieldDataType": "string", + "signal": "metrics" + } + ], + "http.response.status_code": [ + { + "name": "http.response.status_code", + "fieldContext": "attribute", + "fieldDataType": "int", + "signal": "metrics" + } + ], + "host.name": [ + { + "name": "host.name", + "fieldContext": "resource", + "fieldDataType": "string", + "signal": "metrics" + } + ] +} \ No newline at end of file diff --git a/pkg/types/metrictypes/metrictypes.go b/pkg/types/metrictypes/metrictypes.go index cd2aa6da84..61a8e9284a 100644 --- a/pkg/types/metrictypes/metrictypes.go +++ b/pkg/types/metrictypes/metrictypes.go @@ -92,22 +92,16 @@ func (s SpaceAggregation) Percentile() float64 { } // MetricTableHints is a struct that contains tables to use instead of the derived tables -// from the start and end time +// from the start and end time, for internal use only when we need to override the derived tables type MetricTableHints struct { TimeSeriesTableName string SamplesTableName string } -// Certain OTEL metrics encode the state in the value of the metric, which is in general -// a bad modelling (presumably coming from some vendor) and makes it hard to write the aggregation queries -// -// (should have been a key:value with 0, 1 ), example: (pod_status: 0, 1, 2, 3, 4, 5) -// -// they are better modelled as pod_status: (state: running, pending, terminated, etc) -// the value would be 0 or 1, if the value is 1, and the state is pending, then it indicates pod is pending. -// -// however, there have been some metrics that do this, and we need to support them. -// This is workaround for those metrics. +// Until recently, certain OTEL metrics encode the state in the value of the metric, which is in general +// a bad modelling (presumably coming from some vendor) and makes it hard to write the aggregation queries. +// While this is not the case anymore, there are some existing metrics that do this, we need a way to support them. +// This is a workaround for those metrics. type MetricValueFilter struct { Value float64 } diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go b/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go index 96025c14b3..fe0c80a2bc 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/builder_elements.go @@ -153,7 +153,7 @@ type MetricAggregation struct { // metric to query MetricName string `json:"metricName"` // type of the metric - Type metrictypes.Type `json:"type"` + Type metrictypes.Type `json:"-"` // temporality to apply to the query Temporality metrictypes.Temporality `json:"temporality"` // time aggregation to apply to the query diff --git a/pkg/types/telemetrytypes/telemetrytypestest/load_keys.go b/pkg/types/telemetrytypes/telemetrytypestest/load_keys.go new file mode 100644 index 0000000000..0aeafd0934 --- /dev/null +++ b/pkg/types/telemetrytypes/telemetrytypestest/load_keys.go @@ -0,0 +1,37 @@ +package telemetrytypestest + +import ( + "encoding/json" + "fmt" + "os" + + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" +) + +// LoadFieldKeysFromJSON loads telemetry field keys from a JSON file +func LoadFieldKeysFromJSON(filePath string) (map[string][]*telemetrytypes.TelemetryFieldKey, error) { + // Read the JSON file + jsonData, err := os.ReadFile(filePath) + if err != nil { + return nil, fmt.Errorf("failed to read JSON file: %w", err) + } + + // Parse JSON directly into the target format using built-in unmarshaling + var result map[string][]*telemetrytypes.TelemetryFieldKey + if err := json.Unmarshal(jsonData, &result); err != nil { + return nil, fmt.Errorf("failed to unmarshal JSON: %w", err) + } + + return result, nil +} + +// LoadFieldKeysFromJSONString loads telemetry field keys from a JSON string +func LoadFieldKeysFromJSONString(jsonStr string) (map[string][]*telemetrytypes.TelemetryFieldKey, error) { + // Parse JSON directly into the target format using built-in unmarshaling + var result map[string][]*telemetrytypes.TelemetryFieldKey + if err := json.Unmarshal([]byte(jsonStr), &result); err != nil { + return nil, fmt.Errorf("failed to unmarshal JSON: %w", err) + } + + return result, nil +}