feat(summary-view): add summary view endpoints

This commit is contained in:
aniketio-ctrl 2025-02-20 13:49:44 +05:30 committed by GitHub
parent 407654e68e
commit 972a7a9dac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 1344 additions and 3 deletions

View File

@ -375,6 +375,7 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler, web web.Web) (*h
apiHandler.RegisterWebSocketPaths(r, am)
apiHandler.RegisterMessagingQueuesRoutes(r, am)
apiHandler.RegisterThirdPartyApiRoutes(r, am)
apiHandler.MetricExplorerRoutes(r, am)
c := cors.New(cors.Options{
AllowedOrigins: []string{"*"},

2
go.mod
View File

@ -72,6 +72,7 @@ require (
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842
golang.org/x/net v0.33.0
golang.org/x/oauth2 v0.24.0
golang.org/x/sync v0.10.0
golang.org/x/text v0.21.0
google.golang.org/grpc v1.67.1
google.golang.org/protobuf v1.35.2
@ -263,7 +264,6 @@ require (
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/mod v0.22.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/time v0.6.0 // indirect
golang.org/x/tools v0.28.0 // indirect

View File

@ -16,6 +16,8 @@ import (
"sync"
"time"
"go.signoz.io/signoz/pkg/query-service/model/metrics_explorer"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/uuid"
@ -1141,7 +1143,7 @@ func (r *ClickHouseReader) GetUsage(ctx context.Context, queryParams *model.GetU
func (r *ClickHouseReader) SearchTracesV2(ctx context.Context, params *model.SearchTracesParams,
smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string,
levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) {
levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) {
searchSpansResult := []model.SearchSpansResult{
{
Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References", "Events", "HasError", "StatusMessage", "StatusCodeString", "SpanKind"},
@ -1289,7 +1291,7 @@ func (r *ClickHouseReader) SearchTracesV2(ctx context.Context, params *model.Sea
func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.SearchTracesParams,
smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string,
levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) {
levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) {
if r.useTraceNewSchema {
return r.SearchTracesV2(ctx, params, smartTraceAlgorithm)
@ -5594,3 +5596,547 @@ func (r *ClickHouseReader) SubscribeToQueryProgress(
) (<-chan model.QueryProgress, func(), *model.ApiError) {
return r.queryProgressTracker.SubscribeToQueryProgress(queryId)
}
func (r *ClickHouseReader) GetAllMetricFilterAttributeKeys(ctx context.Context, req *metrics_explorer.FilterKeyRequest, skipDotNames bool) (*[]v3.AttributeKey, *model.ApiError) {
var rows driver.Rows
var response []v3.AttributeKey
query := fmt.Sprintf("SELECT arrayJoin(tagKeys) AS distinctTagKey FROM (SELECT JSONExtractKeys(labels) AS tagKeys FROM %s.%s WHERE unix_milli >= $1 GROUP BY tagKeys) WHERE distinctTagKey ILIKE $2 AND distinctTagKey NOT LIKE '\\_\\_%%' GROUP BY distinctTagKey", signozMetricDBName, signozTSTableNameV41Day)
if req.Limit != 0 {
query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
}
rows, err := r.db.Query(ctx, query, common.PastDayRoundOff(), fmt.Sprintf("%%%s%%", req.SearchText))
if err != nil {
zap.L().Error("Error while executing query", zap.Error(err))
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
var attributeKey string
for rows.Next() {
if err := rows.Scan(&attributeKey); err != nil {
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
if skipDotNames && strings.Contains(attributeKey, ".") {
continue
}
key := v3.AttributeKey{
Key: attributeKey,
DataType: v3.AttributeKeyDataTypeString, // https://github.com/OpenObservability/OpenMetrics/blob/main/proto/openmetrics_data_model.proto#L64-L72.
Type: v3.AttributeKeyTypeTag,
IsColumn: false,
}
response = append(response, key)
}
return &response, nil
}
func (r *ClickHouseReader) GetAllMetricFilterAttributeValues(ctx context.Context, req *metrics_explorer.FilterValueRequest) ([]string, *model.ApiError) {
var query string
var err error
var rows driver.Rows
var attributeValues []string
query = fmt.Sprintf("SELECT JSONExtractString(labels, $1) AS tagValue FROM %s.%s WHERE JSONExtractString(labels, $2) ILIKE $3 AND unix_milli >= $4 GROUP BY tagValue", signozMetricDBName, signozTSTableNameV41Day)
if req.Limit != 0 {
query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
}
rows, err = r.db.Query(ctx, query, req.FilterKey, req.FilterKey, fmt.Sprintf("%%%s%%", req.SearchText), common.PastDayRoundOff())
if err != nil {
zap.L().Error("Error while executing query", zap.Error(err))
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
defer rows.Close()
var atrributeValue string
for rows.Next() {
if err := rows.Scan(&atrributeValue); err != nil {
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
attributeValues = append(attributeValues, atrributeValue)
}
return attributeValues, nil
}
func (r *ClickHouseReader) GetAllMetricFilterUnits(ctx context.Context, req *metrics_explorer.FilterValueRequest) ([]string, *model.ApiError) {
var rows driver.Rows
var response []string
query := fmt.Sprintf("SELECT DISTINCT unit FROM %s.%s WHERE unit ILIKE $1 AND unit IS NOT NULL ORDER BY unit", signozMetricDBName, signozTSTableNameV41Day)
if req.Limit != 0 {
query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
}
rows, err := r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText))
if err != nil {
zap.L().Error("Error while executing query", zap.Error(err))
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
var attributeKey string
for rows.Next() {
if err := rows.Scan(&attributeKey); err != nil {
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
response = append(response, attributeKey)
}
return response, nil
}
func (r *ClickHouseReader) GetAllMetricFilterTypes(ctx context.Context, req *metrics_explorer.FilterValueRequest) ([]string, *model.ApiError) {
var rows driver.Rows
var response []string
query := fmt.Sprintf("SELECT DISTINCT type FROM %s.%s WHERE type ILIKE $1 AND type IS NOT NULL ORDER BY type", signozMetricDBName, signozTSTableNameV41Day)
if req.Limit != 0 {
query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
}
rows, err := r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText))
if err != nil {
zap.L().Error("Error while executing query", zap.Error(err))
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
var attributeKey string
for rows.Next() {
if err := rows.Scan(&attributeKey); err != nil {
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
response = append(response, attributeKey)
}
return response, nil
}
func (r *ClickHouseReader) GetMetricsDataPointsAndLastReceived(ctx context.Context, metricName string) (uint64, uint64, *model.ApiError) {
query := fmt.Sprintf("SELECT COUNT(*) AS data_points, MAX(unix_milli) AS last_received_time FROM %s.%s WHERE metric_name = ?", signozMetricDBName, signozSampleTableName)
var lastRecievedTimestamp int64 // Changed from uint64 to int64
var dataPoints uint64
err := r.db.QueryRow(ctx, query, metricName).Scan(&dataPoints, &lastRecievedTimestamp)
if err != nil {
return 0, 0, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
return dataPoints, uint64(lastRecievedTimestamp), nil // Convert to uint64 before returning
}
func (r *ClickHouseReader) GetTotalTimeSeriesForMetricName(ctx context.Context, metricName string) (uint64, *model.ApiError) {
query := fmt.Sprintf(`SELECT
uniq(fingerprint) AS timeSeriesCount
FROM %s.%s
WHERE metric_name = ?;`, signozMetricDBName, signozTSTableNameV41Week)
var timeSeriesCount uint64
err := r.db.QueryRow(ctx, query, metricName).Scan(&timeSeriesCount)
if err != nil {
return 0, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
return timeSeriesCount, nil
}
func (r *ClickHouseReader) GetAttributesForMetricName(ctx context.Context, metricName string) (*[]metrics_explorer.Attribute, *model.ApiError) {
query := fmt.Sprintf(`
SELECT
kv.1 AS key,
arrayMap(x -> trim(BOTH '\"' FROM x), groupUniqArray(10000)(kv.2)) AS values,
length(groupUniqArray(10000)(kv.2)) AS valueCount
FROM %s.%s
ARRAY JOIN arrayFilter(x -> NOT startsWith(x.1, '__'), JSONExtractKeysAndValuesRaw(labels)) AS kv
WHERE metric_name = ?
GROUP BY kv.1
ORDER BY valueCount DESC;
`, signozMetricDBName, signozTSTableNameV41Week)
rows, err := r.db.Query(ctx, query, metricName)
if err != nil {
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
defer rows.Close() // Ensure the rows are closed
var attributesList []metrics_explorer.Attribute
for rows.Next() {
var key string
var values []string
var valueCount uint64
// Manually scan each value into its corresponding variable
if err := rows.Scan(&key, &values, &valueCount); err != nil {
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
// Append the scanned values into the struct
attributesList = append(attributesList, metrics_explorer.Attribute{
Key: key,
Value: values,
ValueCount: valueCount,
})
}
// Handle any errors encountered while scanning rows
if err := rows.Err(); err != nil {
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
return &attributesList, nil
}
func (r *ClickHouseReader) GetActiveTimeSeriesForMetricName(ctx context.Context, metricName string, duration time.Duration) (uint64, *model.ApiError) {
milli := time.Now().Add(-duration).UnixMilli()
query := fmt.Sprintf("SELECT uniq(fingerprint) FROM %s.%s WHERE metric_name = '%s' and unix_milli >= ?", signozMetricDBName, signozTSTableNameV4, metricName)
var timeSeries uint64
// Using QueryRow instead of Select since we're only expecting a single value
err := r.db.QueryRow(ctx, query, milli).Scan(&timeSeries)
if err != nil {
return 0, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
return timeSeries, nil
}
func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, req *metrics_explorer.SummaryListMetricsRequest) (*metrics_explorer.SummaryListMetricsResponse, *model.ApiError) {
var args []interface{}
conditions, _ := utils.BuildFilterConditions(&req.Filters, "t")
whereClause := ""
if conditions != nil {
whereClause = "AND " + strings.Join(conditions, " AND ")
}
firstQueryLimit := req.Limit
dataPointsOrder := false
var orderByClauseFirstQuery string
if req.OrderBy.ColumnName == "samples" {
dataPointsOrder = true
orderByClauseFirstQuery = fmt.Sprintf("ORDER BY timeseries %s", req.OrderBy.Order)
if req.Limit < 50 {
firstQueryLimit = 50
}
} else if req.OrderBy.ColumnName == "metric_type" {
orderByClauseFirstQuery = fmt.Sprintf("ORDER BY type %s", req.OrderBy.Order)
} else {
orderByClauseFirstQuery = fmt.Sprintf("ORDER BY %s %s", req.OrderBy.ColumnName, req.OrderBy.Order)
}
start, end, tsTable, localTsTable := utils.WhichTSTableToUse(req.Start, req.EndD)
sampleTable, countExp := utils.WhichSampleTableToUse(req.Start, req.EndD)
metricsQuery := fmt.Sprintf(
`SELECT
t.metric_name AS metric_name,
ANY_VALUE(t.description) AS description,
ANY_VALUE(t.type) AS type,
ANY_VALUE(t.unit),
uniq(t.fingerprint) AS timeseries,
uniq(metric_name) OVER() AS total
FROM %s.%s AS t
WHERE unix_milli BETWEEN ? AND ?
%s
GROUP BY t.metric_name
%s
LIMIT %d OFFSET %d;`,
signozMetricDBName, tsTable, whereClause, orderByClauseFirstQuery, firstQueryLimit, req.Offset)
args = append(args, start, end)
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
rows, err := r.db.Query(valueCtx, metricsQuery, args...)
if err != nil {
zap.L().Error("Error executing metrics query", zap.Error(err))
return &metrics_explorer.SummaryListMetricsResponse{}, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
defer rows.Close()
var response metrics_explorer.SummaryListMetricsResponse
var metricNames []string
for rows.Next() {
var metric metrics_explorer.MetricDetail
if err := rows.Scan(&metric.MetricName, &metric.Description, &metric.Type, &metric.Unit, &metric.TimeSeries, &response.Total); err != nil {
zap.L().Error("Error scanning metric row", zap.Error(err))
return &response, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
metricNames = append(metricNames, metric.MetricName)
response.Metrics = append(response.Metrics, metric)
}
if err := rows.Err(); err != nil {
zap.L().Error("Error iterating over metric rows", zap.Error(err))
return &response, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
if len(metricNames) == 0 {
return &response, nil
}
metricsList := "'" + strings.Join(metricNames, "', '") + "'"
if dataPointsOrder {
orderByClauseFirstQuery = fmt.Sprintf("ORDER BY s.samples %s", req.OrderBy.Order)
} else {
orderByClauseFirstQuery = ""
}
sampleQuery := fmt.Sprintf(
`SELECT
s.samples,
s.metric_name,
s.unix_milli AS lastReceived
FROM (
SELECT
metric_name,
%s AS samples,
max(unix_milli) as unix_milli
FROM %s.%s
WHERE fingerprint IN (
SELECT fingerprint
FROM %s.%s
WHERE unix_milli BETWEEN ? AND ?
%s
AND metric_name IN (%s)
GROUP BY fingerprint
)
AND metric_name in (%s)
GROUP BY metric_name
) AS s
%s
LIMIT %d OFFSET %d;`,
countExp, signozMetricDBName, sampleTable, signozMetricDBName, localTsTable,
whereClause, metricsList, metricsList, orderByClauseFirstQuery,
req.Limit, req.Offset)
args = append(args, start, end)
rows, err = r.db.Query(valueCtx, sampleQuery, args...)
if err != nil {
zap.L().Error("Error executing samples query", zap.Error(err))
return &response, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
defer rows.Close()
samplesMap := make(map[string]uint64)
lastReceivedMap := make(map[string]int64)
for rows.Next() {
var samples uint64
var metricName string
var lastReceived int64
if err := rows.Scan(&samples, &metricName, &lastReceived); err != nil {
zap.L().Error("Error scanning sample row", zap.Error(err))
return &response, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
samplesMap[metricName] = samples
lastReceivedMap[metricName] = lastReceived
}
if err := rows.Err(); err != nil {
zap.L().Error("Error iterating over sample rows", zap.Error(err))
return &response, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
var filteredMetrics []metrics_explorer.MetricDetail
for i := range response.Metrics {
if samples, exists := samplesMap[response.Metrics[i].MetricName]; exists {
response.Metrics[i].Samples = samples
if lastReceived, exists := lastReceivedMap[response.Metrics[i].MetricName]; exists {
response.Metrics[i].LastReceived = lastReceived
}
filteredMetrics = append(filteredMetrics, response.Metrics[i])
}
}
response.Metrics = filteredMetrics
if dataPointsOrder {
sort.Slice(response.Metrics, func(i, j int) bool {
return response.Metrics[i].Samples > response.Metrics[j].Samples
})
}
return &response, nil
}
func (r *ClickHouseReader) GetMetricsTimeSeriesPercentage(ctx context.Context, req *metrics_explorer.TreeMapMetricsRequest) (*[]metrics_explorer.TreeMapResponseItem, *model.ApiError) {
var args []interface{}
// Build filters dynamically
conditions, _ := utils.BuildFilterConditions(&req.Filters, "")
whereClause := ""
if len(conditions) > 0 {
whereClause = "AND " + strings.Join(conditions, " AND ")
}
start, end, tsTable, _ := utils.WhichTSTableToUse(req.Start, req.EndD)
// Construct the query without backticks
query := fmt.Sprintf(`
SELECT
metric_name,
total_value,
(total_value * 100.0 / total_time_series) AS percentage
FROM (
SELECT
metric_name,
uniq(fingerprint) AS total_value,
(SELECT uniq(fingerprint)
FROM %s.%s
WHERE unix_milli BETWEEN ? AND ?) AS total_time_series
FROM %s.%s
WHERE unix_milli BETWEEN ? AND ? %s
GROUP BY metric_name
)
ORDER BY percentage DESC
LIMIT %d;`,
signozMetricDBName,
tsTable,
signozMetricDBName,
tsTable,
whereClause,
req.Limit,
)
args = append(args,
start, end, // For total_cardinality subquery
start, end, // For main query
)
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
rows, err := r.db.Query(valueCtx, query, args...)
if err != nil {
zap.L().Error("Error executing cardinality query", zap.Error(err), zap.String("query", query))
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
defer rows.Close()
var heatmap []metrics_explorer.TreeMapResponseItem
for rows.Next() {
var item metrics_explorer.TreeMapResponseItem
if err := rows.Scan(&item.MetricName, &item.TotalValue, &item.Percentage); err != nil {
zap.L().Error("Error scanning row", zap.Error(err))
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
heatmap = append(heatmap, item)
}
if err := rows.Err(); err != nil {
zap.L().Error("Error iterating over rows", zap.Error(err))
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
return &heatmap, nil
}
func (r *ClickHouseReader) GetMetricsSamplesPercentage(ctx context.Context, req *metrics_explorer.TreeMapMetricsRequest) (*[]metrics_explorer.TreeMapResponseItem, *model.ApiError) {
var args []interface{}
// Build the filter conditions
conditions, _ := utils.BuildFilterConditions(&req.Filters, "t")
whereClause := ""
if conditions != nil {
whereClause = "AND " + strings.Join(conditions, " AND ")
}
// Determine time range and tables to use
start, end, tsTable, localTsTable := utils.WhichTSTableToUse(req.Start, req.EndD)
sampleTable, countExp := utils.WhichSampleTableToUse(req.Start, req.EndD)
// Construct the metrics query
queryLimit := 50 + req.Limit
metricsQuery := fmt.Sprintf(
`SELECT
t.metric_name AS metric_name,
uniq(t.fingerprint) AS timeSeries
FROM %s.%s AS t
WHERE unix_milli BETWEEN ? AND ?
%s
GROUP BY t.metric_name
ORDER BY timeSeries DESC
LIMIT %d;`,
signozMetricDBName, tsTable, whereClause, queryLimit,
)
args = append(args, start, end)
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
// Execute the metrics query
rows, err := r.db.Query(valueCtx, metricsQuery, args...)
if err != nil {
zap.L().Error("Error executing metrics query", zap.Error(err))
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
defer rows.Close()
// Process the query results
var metricNames []string
for rows.Next() {
var metricName string
var timeSeries uint64
if err := rows.Scan(&metricName, &timeSeries); err != nil {
zap.L().Error("Error scanning metric row", zap.Error(err))
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
metricNames = append(metricNames, metricName)
}
if err := rows.Err(); err != nil {
zap.L().Error("Error iterating over metric rows", zap.Error(err))
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
// If no metrics found, return early
if len(metricNames) == 0 {
return nil, nil
}
// Format metric names for query
metricsList := "'" + strings.Join(metricNames, "', '") + "'"
// Construct the sample percentage query
sampleQuery := fmt.Sprintf(
`WITH TotalSamples AS (
SELECT %s AS total_samples
FROM %s.%s
WHERE unix_milli BETWEEN ? AND ?
)
SELECT
s.samples,
s.metric_name,
COALESCE((s.samples * 100.0 / t.total_samples), 0) AS percentage
FROM
(
SELECT
metric_name,
%s AS samples
FROM %s.%s
WHERE fingerprint IN
(
SELECT fingerprint
FROM %s.%s
WHERE unix_milli BETWEEN ? AND ?
%s
AND metric_name IN (%s)
GROUP BY fingerprint
)
AND metric_name IN (%s)
GROUP BY metric_name
) AS s
JOIN TotalSamples t ON 1 = 1
ORDER BY percentage DESC
LIMIT %d;`,
countExp, signozMetricDBName, sampleTable, // Total samples
countExp, signozMetricDBName, sampleTable, // Inner select samples
signozMetricDBName, localTsTable, whereClause, metricsList, // Subquery conditions
metricsList, req.Limit, // Final conditions
)
args = append(args, start, end)
// Execute the sample percentage query
rows, err = r.db.Query(valueCtx, sampleQuery, args...)
if err != nil {
zap.L().Error("Error executing samples query", zap.Error(err))
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
defer rows.Close()
// Process the results into a response slice
var heatmap []metrics_explorer.TreeMapResponseItem
for rows.Next() {
var item metrics_explorer.TreeMapResponseItem
if err := rows.Scan(&item.TotalValue, &item.MetricName, &item.Percentage); err != nil {
zap.L().Error("Error scanning row", zap.Error(err))
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
heatmap = append(heatmap, item)
}
if err := rows.Err(); err != nil {
zap.L().Error("Error iterating over sample rows", zap.Error(err))
return nil, &model.ApiError{Typ: "ClickHouseError", Err: err}
}
return &heatmap, nil
}

View File

@ -537,3 +537,87 @@ func countPanelsInDashboard(inputData map[string]interface{}) model.DashboardsIn
LogsPanelsWithAttrContainsOp: logsPanelsWithAttrContains,
}
}
func GetDashboardsWithMetricName(ctx context.Context, metricName string) ([]map[string]string, *model.ApiError) {
// Get all dashboards first
query := `SELECT uuid, data FROM dashboards`
type dashboardRow struct {
Uuid string `db:"uuid"`
Data json.RawMessage `db:"data"`
}
var dashboards []dashboardRow
err := db.Select(&dashboards, query)
if err != nil {
zap.L().Error("Error in getting dashboards", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorExec, Err: err}
}
// Process the JSON data in Go
var result []map[string]string
for _, dashboard := range dashboards {
var dashData map[string]interface{}
if err := json.Unmarshal(dashboard.Data, &dashData); err != nil {
continue
}
dashTitle, _ := dashData["title"].(string)
widgets, ok := dashData["widgets"].([]interface{})
if !ok {
continue
}
for _, w := range widgets {
widget, ok := w.(map[string]interface{})
if !ok {
continue
}
widgetTitle, _ := widget["title"].(string)
widgetID, _ := widget["id"].(string)
query, ok := widget["query"].(map[string]interface{})
if !ok {
continue
}
builder, ok := query["builder"].(map[string]interface{})
if !ok {
continue
}
queryData, ok := builder["queryData"].([]interface{})
if !ok {
continue
}
for _, qd := range queryData {
data, ok := qd.(map[string]interface{})
if !ok {
continue
}
if dataSource, ok := data["dataSource"].(string); !ok || dataSource != "metrics" {
continue
}
aggregateAttr, ok := data["aggregateAttribute"].(map[string]interface{})
if !ok {
continue
}
if key, ok := aggregateAttr["key"].(string); ok && strings.TrimSpace(key) == metricName {
result = append(result, map[string]string{
"dashboard_id": dashboard.Uuid,
"widget_title": widgetTitle,
"widget_id": widgetID,
"dashboard_title": dashTitle,
})
}
}
}
}
return result, nil
}

View File

@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"go.signoz.io/signoz/pkg/query-service/app/metricsexplorer"
"io"
"math"
"net/http"
@ -127,6 +128,8 @@ type APIHandler struct {
statefulsetsRepo *inframetrics.StatefulSetsRepo
jobsRepo *inframetrics.JobsRepo
SummaryService *metricsexplorer.SummaryService
pvcsRepo *inframetrics.PvcsRepo
JWT *authtypes.JWT
@ -215,6 +218,8 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
statefulsetsRepo := inframetrics.NewStatefulSetsRepo(opts.Reader, querierv2)
jobsRepo := inframetrics.NewJobsRepo(opts.Reader, querierv2)
pvcsRepo := inframetrics.NewPvcsRepo(opts.Reader, querierv2)
//explorerCache := metricsexplorer.NewExplorerCache(metricsexplorer.WithCache(opts.Cache))
summaryService := metricsexplorer.NewSummaryService(opts.Reader, querierv2)
aH := &APIHandler{
reader: opts.Reader,
@ -244,6 +249,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
jobsRepo: jobsRepo,
pvcsRepo: pvcsRepo,
JWT: opts.JWT,
SummaryService: summaryService,
}
logsQueryBuilder := logsv3.PrepareLogsQuery
@ -606,6 +612,24 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) {
router.HandleFunc("/api/v1/changePassword/{id}", am.SelfAccess(aH.changePassword)).Methods(http.MethodPost)
}
func (ah *APIHandler) MetricExplorerRoutes(router *mux.Router, am *AuthMiddleware) {
router.HandleFunc("/api/v1/metrics/filters/keys",
am.ViewAccess(ah.FilterKeysSuggestion)).
Methods(http.MethodGet)
router.HandleFunc("/api/v1/metrics/filters/values",
am.ViewAccess(ah.FilterValuesSuggestion)).
Methods(http.MethodPost)
router.HandleFunc("/api/v1/metrics/{metric_name}/metadata",
am.ViewAccess(ah.GetMetricsDetails)).
Methods(http.MethodGet)
router.HandleFunc("/api/v1/metrics",
am.ViewAccess(ah.ListMetrics)).
Methods(http.MethodPost)
router.HandleFunc("/api/v1/metrics/treemap",
am.ViewAccess(ah.GetTreeMap)).
Methods(http.MethodPost)
}
func Intersection(a, b []int) (c []int) {
m := make(map[int]bool)

View File

@ -0,0 +1,70 @@
package metricsexplorer
import (
"encoding/json"
"fmt"
"net/http"
"strconv"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/model"
"go.signoz.io/signoz/pkg/query-service/model/metrics_explorer"
)
func ParseFilterKeySuggestions(r *http.Request) (*metrics_explorer.FilterKeyRequest, *model.ApiError) {
searchText := r.URL.Query().Get("searchText")
limit, err := strconv.Atoi(r.URL.Query().Get("limit"))
if err != nil {
limit = 50
}
return &metrics_explorer.FilterKeyRequest{Limit: limit, SearchText: searchText}, nil
}
func ParseFilterValueSuggestions(r *http.Request) (*metrics_explorer.FilterValueRequest, *model.ApiError) {
var filterValueRequest metrics_explorer.FilterValueRequest
// parse the request body
if err := json.NewDecoder(r.Body).Decode(&filterValueRequest); err != nil {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)}
}
return &filterValueRequest, nil
}
func ParseSummaryListMetricsParams(r *http.Request) (*metrics_explorer.SummaryListMetricsRequest, *model.ApiError) {
var listMetricsParams *metrics_explorer.SummaryListMetricsRequest
// parse the request body
if err := json.NewDecoder(r.Body).Decode(&listMetricsParams); err != nil {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)}
}
if listMetricsParams.OrderBy.ColumnName == "" || listMetricsParams.OrderBy.Order == "" {
listMetricsParams.OrderBy.ColumnName = "timeseries" // DEFAULT ORDER BY
listMetricsParams.OrderBy.Order = v3.DirectionDesc
}
if listMetricsParams.Limit == 0 {
listMetricsParams.Limit = 10 // DEFAULT LIMIT
}
return listMetricsParams, nil
}
func ParseTreeMapMetricsParams(r *http.Request) (*metrics_explorer.TreeMapMetricsRequest, *model.ApiError) {
var treeMapMetricParams *metrics_explorer.TreeMapMetricsRequest
// parse the request body
if err := json.NewDecoder(r.Body).Decode(&treeMapMetricParams); err != nil {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)}
}
if treeMapMetricParams.Limit == 0 {
treeMapMetricParams.Limit = 10
}
return treeMapMetricParams, nil
}

View File

@ -0,0 +1,209 @@
package metricsexplorer
import (
"context"
"encoding/json"
"time"
"go.uber.org/zap"
"go.signoz.io/signoz/pkg/query-service/app/dashboards"
"go.signoz.io/signoz/pkg/query-service/interfaces"
"go.signoz.io/signoz/pkg/query-service/model"
"go.signoz.io/signoz/pkg/query-service/model/metrics_explorer"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"golang.org/x/sync/errgroup"
)
type SummaryService struct {
reader interfaces.Reader
querierV2 interfaces.Querier
}
func NewSummaryService(reader interfaces.Reader, querierV2 interfaces.Querier) *SummaryService {
return &SummaryService{reader: reader, querierV2: querierV2}
}
func (receiver *SummaryService) FilterKeys(ctx context.Context, params *metrics_explorer.FilterKeyRequest) (*metrics_explorer.FilterKeyResponse, *model.ApiError) {
var response metrics_explorer.FilterKeyResponse
keys, apiError := receiver.reader.GetAllMetricFilterAttributeKeys(
ctx,
params,
true,
)
if apiError != nil {
return nil, apiError
}
response.AttributeKeys = *keys
var availableColumnFilter []string
for key := range metrics_explorer.AvailableColumnFilterMap {
availableColumnFilter = append(availableColumnFilter, key)
}
response.MetricColumns = availableColumnFilter
return &response, nil
}
func (receiver *SummaryService) FilterValues(ctx context.Context, params *metrics_explorer.FilterValueRequest) (*metrics_explorer.FilterValueResponse, *model.ApiError) {
var response metrics_explorer.FilterValueResponse
switch params.FilterKey {
case "metric_name":
var filterValues []string
request := v3.AggregateAttributeRequest{DataSource: v3.DataSourceMetrics, SearchText: params.SearchText, Limit: params.Limit}
attributes, err := receiver.reader.GetMetricAggregateAttributes(ctx, &request, true)
if err != nil {
return nil, model.InternalError(err)
}
for _, item := range attributes.AttributeKeys {
filterValues = append(filterValues, item.Key)
}
response.FilterValues = filterValues
return &response, nil
case "metric_unit":
attributes, err := receiver.reader.GetAllMetricFilterUnits(ctx, params)
if err != nil {
return nil, err
}
response.FilterValues = attributes
return &response, nil
case "metric_type":
attributes, err := receiver.reader.GetAllMetricFilterTypes(ctx, params)
if err != nil {
return nil, err
}
response.FilterValues = attributes
return &response, nil
default:
attributes, err := receiver.reader.GetAllMetricFilterAttributeValues(ctx, params)
if err != nil {
return nil, err
}
response.FilterValues = attributes
return &response, nil
}
}
func (receiver *SummaryService) GetMetricsSummary(ctx context.Context, metricName string) (metrics_explorer.MetricDetailsDTO, *model.ApiError) {
var metricDetailsDTO metrics_explorer.MetricDetailsDTO
g, ctx := errgroup.WithContext(ctx)
// Call 1: GetMetricMetadata
g.Go(func() error {
metadata, err := receiver.reader.GetMetricMetadata(ctx, metricName, metricName)
if err != nil {
return &model.ApiError{Typ: "ClickHouseError", Err: err}
}
metricDetailsDTO.Name = metricName
metricDetailsDTO.Unit = metadata.Unit
metricDetailsDTO.Description = metadata.Description
metricDetailsDTO.Type = metadata.Type
metricDetailsDTO.Metadata.MetricType = metadata.Type
metricDetailsDTO.Metadata.Description = metadata.Description
metricDetailsDTO.Metadata.Unit = metadata.Unit
return nil
})
// Call 2: GetMetricsDataPointsAndLastReceived
g.Go(func() error {
dataPoints, lastReceived, err := receiver.reader.GetMetricsDataPointsAndLastReceived(ctx, metricName)
if err != nil {
return err
}
metricDetailsDTO.Samples = dataPoints
metricDetailsDTO.LastReceived = lastReceived
return nil
})
// Call 3: GetTotalTimeSeriesForMetricName
g.Go(func() error {
totalSeries, err := receiver.reader.GetTotalTimeSeriesForMetricName(ctx, metricName)
if err != nil {
return err
}
metricDetailsDTO.TimeSeriesTotal = totalSeries
return nil
})
// Call 4: GetActiveTimeSeriesForMetricName
g.Go(func() error {
activeSeries, err := receiver.reader.GetActiveTimeSeriesForMetricName(ctx, metricName, 120*time.Minute)
if err != nil {
return err
}
metricDetailsDTO.TimeSeriesActive = activeSeries
return nil
})
// Call 5: GetAttributesForMetricName
g.Go(func() error {
attributes, err := receiver.reader.GetAttributesForMetricName(ctx, metricName)
if err != nil {
return err
}
if attributes != nil {
metricDetailsDTO.Attributes = *attributes
}
return nil
})
// Call 6: GetDashboardsWithMetricName
g.Go(func() error {
data, err := dashboards.GetDashboardsWithMetricName(ctx, metricName)
if err != nil {
return err
}
if data != nil {
jsonData, err := json.Marshal(data)
if err != nil {
zap.L().Error("Error marshalling data:", zap.Error(err))
return &model.ApiError{Typ: "MarshallingErr", Err: err}
}
var dashboards []metrics_explorer.Dashboard
err = json.Unmarshal(jsonData, &dashboards)
if err != nil {
zap.L().Error("Error unmarshalling data:", zap.Error(err))
return &model.ApiError{Typ: "UnMarshallingErr", Err: err}
}
metricDetailsDTO.Dashboards = dashboards
}
return nil
})
// Wait for all goroutines and handle any errors
if err := g.Wait(); err != nil {
// Type assert to check if it's already an ApiError
if apiErr, ok := err.(*model.ApiError); ok {
return metrics_explorer.MetricDetailsDTO{}, apiErr
}
// If it's not an ApiError, wrap it in one
return metrics_explorer.MetricDetailsDTO{}, &model.ApiError{Typ: "InternalError", Err: err}
}
return metricDetailsDTO, nil
}
func (receiver *SummaryService) ListMetricsWithSummary(ctx context.Context, params *metrics_explorer.SummaryListMetricsRequest) (*metrics_explorer.SummaryListMetricsResponse, *model.ApiError) {
return receiver.reader.ListSummaryMetrics(ctx, params)
}
func (receiver *SummaryService) GetMetricsTreemap(ctx context.Context, params *metrics_explorer.TreeMapMetricsRequest) (*metrics_explorer.TreeMap, *model.ApiError) {
var response metrics_explorer.TreeMap
switch params.Treemap {
case metrics_explorer.TimeSeriesTeeMap:
cardinality, apiError := receiver.reader.GetMetricsTimeSeriesPercentage(ctx, params)
if apiError != nil {
return nil, apiError
}
response.TimeSeries = *cardinality
return &response, nil
case metrics_explorer.SamplesTreeMap:
dataPoints, apiError := receiver.reader.GetMetricsSamplesPercentage(ctx, params)
if apiError != nil {
return nil, apiError
}
response.Samples = *dataPoints
return &response, nil
default:
return nil, nil
}
}

View File

@ -316,6 +316,7 @@ func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server,
api.RegisterQueryRangeV4Routes(r, am)
api.RegisterMessagingQueuesRoutes(r, am)
api.RegisterThirdPartyApiRoutes(r, am)
api.MetricExplorerRoutes(r, am)
c := cors.New(cors.Options{
AllowedOrigins: []string{"*"},

View File

@ -0,0 +1,104 @@
package app
import (
"bytes"
"io"
"net/http"
"github.com/gorilla/mux"
"go.signoz.io/signoz/pkg/query-service/model"
explorer "go.signoz.io/signoz/pkg/query-service/app/metricsexplorer"
"go.uber.org/zap"
)
func (aH *APIHandler) FilterKeysSuggestion(w http.ResponseWriter, r *http.Request) {
bodyBytes, _ := io.ReadAll(r.Body)
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
ctx := r.Context()
params, apiError := explorer.ParseFilterKeySuggestions(r)
if apiError != nil {
zap.L().Error("error parsing summary filter keys request", zap.Error(apiError.Err))
RespondError(w, apiError, nil)
return
}
keys, apiError := aH.SummaryService.FilterKeys(ctx, params)
if apiError != nil {
zap.L().Error("error getting filter keys", zap.Error(apiError.Err))
RespondError(w, apiError, nil)
return
}
aH.Respond(w, keys)
}
func (aH *APIHandler) FilterValuesSuggestion(w http.ResponseWriter, r *http.Request) {
bodyBytes, _ := io.ReadAll(r.Body)
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
ctx := r.Context()
params, apiError := explorer.ParseFilterValueSuggestions(r)
if apiError != nil {
zap.L().Error("error parsing summary filter values request", zap.Error(apiError.Err))
RespondError(w, apiError, nil)
return
}
values, apiError := aH.SummaryService.FilterValues(ctx, params)
if apiError != nil {
zap.L().Error("error getting filter values", zap.Error(apiError.Err))
RespondError(w, apiError, nil)
return
}
aH.Respond(w, values)
}
func (aH *APIHandler) GetMetricsDetails(w http.ResponseWriter, r *http.Request) {
metricName := mux.Vars(r)["metric_name"]
ctx := r.Context()
metricsDetail, apiError := aH.SummaryService.GetMetricsSummary(ctx, metricName)
if apiError != nil {
zap.L().Error("error parsing metric query range params", zap.Error(apiError.Err))
RespondError(w, apiError, nil)
return
}
aH.Respond(w, metricsDetail)
}
func (aH *APIHandler) ListMetrics(w http.ResponseWriter, r *http.Request) {
bodyBytes, _ := io.ReadAll(r.Body)
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
ctx := r.Context()
params, apiError := explorer.ParseSummaryListMetricsParams(r)
if apiError != nil {
zap.L().Error("error parsing metric list metric summary api request", zap.Error(apiError.Err))
RespondError(w, model.BadRequest(apiError), nil)
return
}
slmr, apiErr := aH.SummaryService.ListMetricsWithSummary(ctx, params)
if apiErr != nil {
zap.L().Error("error parsing metric query range params", zap.Error(apiErr.Err))
RespondError(w, apiError, nil)
return
}
aH.Respond(w, slmr)
}
func (aH *APIHandler) GetTreeMap(w http.ResponseWriter, r *http.Request) {
bodyBytes, _ := io.ReadAll(r.Body)
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
ctx := r.Context()
params, apiError := explorer.ParseTreeMapMetricsParams(r)
if apiError != nil {
zap.L().Error("error parsing metric query range params", zap.Error(apiError.Err))
RespondError(w, apiError, nil)
return
}
result, apiError := aH.SummaryService.GetMetricsTreemap(ctx, params)
if apiError != nil {
zap.L().Error("error getting heatmap data", zap.Error(apiError.Err))
RespondError(w, apiError, nil)
return
}
aH.Respond(w, result)
}

View File

@ -85,6 +85,8 @@ var TimestampSortFeature = GetOrDefaultEnv("TIMESTAMP_SORT_FEATURE", "true")
var PreferRPMFeature = GetOrDefaultEnv("PREFER_RPM_FEATURE", "false")
var MetricsExplorerClickhouseThreads = GetOrDefaultEnvInt("METRICS_EXPLORER_CLICKHOUSE_THREADS", 8)
// TODO(srikanthccv): remove after backfilling is done
func UseMetricsPreAggregation() bool {
return GetOrDefaultEnv("USE_METRICS_PRE_AGGREGATION", "true") == "true"
@ -231,6 +233,9 @@ const (
SIGNOZ_TIMESERIES_v4_1WEEK_LOCAL_TABLENAME = "time_series_v4_1week"
SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME = "distributed_time_series_v4_1day"
SIGNOZ_TOP_LEVEL_OPERATIONS_TABLENAME = "distributed_top_level_operations"
SIGNOZ_TIMESERIES_v4_TABLENAME = "distributed_time_series_v4"
SIGNOZ_TIMESERIES_v4_1WEEK_TABLENAME = "distributed_time_series_v4_1week"
SIGNOZ_TIMESERIES_v4_6HRS_TABLENAME = "distributed_time_series_v4_6hrs"
)
// alert related constants

View File

@ -8,6 +8,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/stats"
"go.signoz.io/signoz/pkg/query-service/model"
"go.signoz.io/signoz/pkg/query-service/model/metrics_explorer"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/querycache"
)
@ -115,6 +116,21 @@ type Reader interface {
//trace
GetTraceFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError)
UpdateTraceField(ctx context.Context, field *model.UpdateField) *model.ApiError
GetAllMetricFilterAttributeValues(ctx context.Context, req *metrics_explorer.FilterValueRequest) ([]string, *model.ApiError)
GetAllMetricFilterUnits(ctx context.Context, req *metrics_explorer.FilterValueRequest) ([]string, *model.ApiError)
GetAllMetricFilterTypes(ctx context.Context, req *metrics_explorer.FilterValueRequest) ([]string, *model.ApiError)
GetAllMetricFilterAttributeKeys(ctx context.Context, req *metrics_explorer.FilterKeyRequest, skipDotNames bool) (*[]v3.AttributeKey, *model.ApiError)
GetMetricsDataPointsAndLastReceived(ctx context.Context, metricName string) (uint64, uint64, *model.ApiError)
GetTotalTimeSeriesForMetricName(ctx context.Context, metricName string) (uint64, *model.ApiError)
GetActiveTimeSeriesForMetricName(ctx context.Context, metricName string, duration time.Duration) (uint64, *model.ApiError)
GetAttributesForMetricName(ctx context.Context, metricName string) (*[]metrics_explorer.Attribute, *model.ApiError)
ListSummaryMetrics(ctx context.Context, req *metrics_explorer.SummaryListMetricsRequest) (*metrics_explorer.SummaryListMetricsResponse, *model.ApiError)
GetMetricsTimeSeriesPercentage(ctx context.Context, request *metrics_explorer.TreeMapMetricsRequest) (*[]metrics_explorer.TreeMapResponseItem, *model.ApiError)
GetMetricsSamplesPercentage(ctx context.Context, req *metrics_explorer.TreeMapMetricsRequest) (*[]metrics_explorer.TreeMapResponseItem, *model.ApiError)
}
type Querier interface {

View File

@ -0,0 +1,124 @@
package metrics_explorer
import (
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
type SummaryListMetricsRequest struct {
Offset int `json:"offset"`
Limit int `json:"limit"`
OrderBy v3.OrderBy `json:"orderBy"`
Start int64 `json:"start"`
EndD int64 `json:"end"`
Filters v3.FilterSet `json:"filters"`
}
type TreeMapType string
const (
TimeSeriesTeeMap TreeMapType = "timeseries"
SamplesTreeMap TreeMapType = "samples"
)
type TreeMapMetricsRequest struct {
Limit int `json:"limit"`
Treemap TreeMapType `json:"treemap"`
Start int64 `json:"start"`
EndD int64 `json:"end"`
Filters v3.FilterSet `json:"filters"`
}
type MetricDetail struct {
MetricName string `json:"metric_name"`
Description string `json:"description"`
Type string `json:"type"`
Unit string `json:"unit"`
TimeSeries uint64 `json:"timeseries"`
Samples uint64 `json:"samples"`
LastReceived int64 `json:"lastReceived"`
}
type TreeMapResponseItem struct {
Percentage float64 `json:"percentage"`
TotalValue uint64 `json:"total_value"`
MetricName string `json:"metric_name"`
}
type TreeMap struct {
TimeSeries []TreeMapResponseItem `json:"timeseries"`
Samples []TreeMapResponseItem `json:"samples"`
}
type SummaryListMetricsResponse struct {
Metrics []MetricDetail `json:"metrics"`
Total uint64 `json:"total"`
}
type Attribute struct {
Key string `json:"key" db:"key"`
Value []string `json:"value" db:"value"`
ValueCount uint64 `json:"valueCount" db:"valueCount"`
}
// Metadata holds additional information about the metric.
type Metadata struct {
MetricType string `json:"metric_type"`
Description string `json:"description"`
Unit string `json:"unit"`
}
// Alert represents individual alerts associated with the metric.
type Alert struct {
AlertName string `json:"alert_name"`
AlertID string `json:"alert_id"`
}
// Dashboard represents individual dashboards associated with the metric.
type Dashboard struct {
DashboardName string `json:"dashboard_name"`
DashboardID string `json:"dashboard_id"`
WidgetID string `json:"widget_id"`
WidgetName string `json:"widget_name"`
}
type MetricDetailsDTO struct {
Name string `json:"name"`
Description string `json:"description"`
Type string `json:"type"`
Unit string `json:"unit"`
Samples uint64 `json:"samples"`
TimeSeriesTotal uint64 `json:"timeSeriesTotal"`
TimeSeriesActive uint64 `json:"timeSeriesActive"`
LastReceived uint64 `json:"lastReceived"`
Attributes []Attribute `json:"attributes"`
Metadata Metadata `json:"metadata"`
Alerts []Alert `json:"alerts"`
Dashboards []Dashboard `json:"dashboards"`
}
type FilterKeyRequest struct {
SearchText string `json:"searchText"`
Limit int `json:"limit"`
}
type FilterValueRequest struct {
FilterKey string `json:"filterKey"`
FilterAttributeKeyDataType v3.AttributeKeyDataType `json:"filterAttributeKeyDataType"`
SearchText string `json:"searchText"`
Limit int `json:"limit"`
}
type FilterValueResponse struct {
FilterValues []string `json:"filterValues"`
}
type FilterKeyResponse struct {
MetricColumns []string `json:"metricColumns"`
AttributeKeys []v3.AttributeKey `json:"attributeKeys"`
}
var AvailableColumnFilterMap = map[string]bool{
"metric_name": true,
"metric_unit": true,
"metric_type": true,
}

View File

@ -0,0 +1,153 @@
package utils
import (
"fmt"
"strings"
"time"
"go.signoz.io/signoz/pkg/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/model/metrics_explorer"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
// skipKey is an optional parameter to skip processing of a specific key
func BuildFilterConditions(fs *v3.FilterSet, skipKey string) ([]string, error) {
if fs == nil || len(fs.Items) == 0 {
return nil, nil
}
var conditions []string
for _, item := range fs.Items {
if skipKey != "" && item.Key.Key == skipKey {
continue
}
toFormat := item.Value
op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator))))
if op == v3.FilterOperatorContains || op == v3.FilterOperatorNotContains {
toFormat = fmt.Sprintf("%%%s%%", toFormat)
}
fmtVal := ClickHouseFormattedValue(toFormat)
// Determine if the key is a JSON key or a normal column
isJSONKey := false
if _, exists := metrics_explorer.AvailableColumnFilterMap[item.Key.Key]; exists {
isJSONKey = false
} else {
isJSONKey = true
}
condition, err := buildSingleFilterCondition(item.Key.Key, op, fmtVal, isJSONKey)
if err != nil {
return nil, err
}
conditions = append(conditions, condition)
}
return conditions, nil
}
func buildSingleFilterCondition(key string, op v3.FilterOperator, fmtVal string, isJSONKey bool) (string, error) {
var keyCondition string
if isJSONKey {
keyCondition = fmt.Sprintf("JSONExtractString(labels, '%s')", key)
} else { // Assuming normal column access
if key == "metric_unit" {
key = "unit"
}
if key == "metric_type" {
key = "type"
}
keyCondition = key
}
switch op {
case v3.FilterOperatorEqual:
return fmt.Sprintf("%s = %s", keyCondition, fmtVal), nil
case v3.FilterOperatorNotEqual:
return fmt.Sprintf("%s != %s", keyCondition, fmtVal), nil
case v3.FilterOperatorIn:
return fmt.Sprintf("%s IN %s", keyCondition, fmtVal), nil
case v3.FilterOperatorNotIn:
return fmt.Sprintf("%s NOT IN %s", keyCondition, fmtVal), nil
case v3.FilterOperatorLike:
return fmt.Sprintf("like(%s, %s)", keyCondition, fmtVal), nil
case v3.FilterOperatorNotLike:
return fmt.Sprintf("notLike(%s, %s)", keyCondition, fmtVal), nil
case v3.FilterOperatorRegex:
return fmt.Sprintf("match(%s, %s)", keyCondition, fmtVal), nil
case v3.FilterOperatorNotRegex:
return fmt.Sprintf("not match(%s, %s)", keyCondition, fmtVal), nil
case v3.FilterOperatorGreaterThan:
return fmt.Sprintf("%s > %s", keyCondition, fmtVal), nil
case v3.FilterOperatorGreaterThanOrEq:
return fmt.Sprintf("%s >= %s", keyCondition, fmtVal), nil
case v3.FilterOperatorLessThan:
return fmt.Sprintf("%s < %s", keyCondition, fmtVal), nil
case v3.FilterOperatorLessThanOrEq:
return fmt.Sprintf("%s <= %s", keyCondition, fmtVal), nil
case v3.FilterOperatorContains:
return fmt.Sprintf("like(%s, %s)", keyCondition, fmtVal), nil
case v3.FilterOperatorNotContains:
return fmt.Sprintf("notLike(%s, %s)", keyCondition, fmtVal), nil
case v3.FilterOperatorExists:
return fmt.Sprintf("has(JSONExtractKeys(labels), '%s')", key), nil
case v3.FilterOperatorNotExists:
return fmt.Sprintf("not has(JSONExtractKeys(labels), '%s')", key), nil
default:
return "", fmt.Errorf("unsupported filter operator: %s", op)
}
}
var (
sixHoursInMilliseconds = time.Hour.Milliseconds() * 6
oneDayInMilliseconds = time.Hour.Milliseconds() * 24
oneWeekInMilliseconds = oneDayInMilliseconds * 7
)
func WhichTSTableToUse(start, end int64) (int64, int64, string, string) {
var tableName string
var localTableName string
if end-start < sixHoursInMilliseconds {
// adjust the start time to nearest 1 hour
start = start - (start % (time.Hour.Milliseconds() * 1))
tableName = constants.SIGNOZ_TIMESERIES_v4_TABLENAME
localTableName = constants.SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME
} else if end-start < oneDayInMilliseconds {
// adjust the start time to nearest 6 hours
start = start - (start % (time.Hour.Milliseconds() * 6))
tableName = constants.SIGNOZ_TIMESERIES_v4_6HRS_TABLENAME
localTableName = constants.SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME
} else if end-start < oneWeekInMilliseconds {
// adjust the start time to nearest 1 day
start = start - (start % (time.Hour.Milliseconds() * 24))
tableName = constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME
localTableName = constants.SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME
} else {
if constants.UseMetricsPreAggregation() {
// adjust the start time to nearest 1 week
start = start - (start % (time.Hour.Milliseconds() * 24 * 7))
tableName = constants.SIGNOZ_TIMESERIES_v4_1WEEK_TABLENAME
localTableName = constants.SIGNOZ_TIMESERIES_v4_1WEEK_LOCAL_TABLENAME
} else {
// continue to use the 1 day table
start = start - (start % (time.Hour.Milliseconds() * 24))
tableName = constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME
localTableName = constants.SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME
}
}
return start, end, tableName, localTableName
}
func WhichSampleTableToUse(start, end int64) (string, string) {
if end-start < oneDayInMilliseconds {
return constants.SIGNOZ_SAMPLES_V4_TABLENAME, "count(*)"
} else if end-start < oneWeekInMilliseconds {
return constants.SIGNOZ_SAMPLES_V4_AGG_5M_TABLENAME, "sum(count)"
} else {
return constants.SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME, "sum(count)"
}
}

View File

@ -63,6 +63,10 @@ func (h *provider) clickHouseSettings(ctx context.Context, query string, args ..
settings["timeout_before_checking_execution_speed"] = h.settings.TimeoutBeforeCheckingExecutionSpeed
}
if ctx.Value("clickhouse_max_threads") != nil {
if maxThreads, ok := ctx.Value("clickhouse_max_threads").(int); ok { settings["max_threads"] = maxThreads }
}
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings))
return ctx, query, args
}