diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index 740278c40b..685c7910b5 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -375,6 +375,7 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler, web web.Web) (*h apiHandler.RegisterWebSocketPaths(r, am) apiHandler.RegisterMessagingQueuesRoutes(r, am) apiHandler.RegisterThirdPartyApiRoutes(r, am) + apiHandler.MetricExplorerRoutes(r, am) c := cors.New(cors.Options{ AllowedOrigins: []string{"*"}, diff --git a/go.mod b/go.mod index 8345fb98fe..689109eba2 100644 --- a/go.mod +++ b/go.mod @@ -72,6 +72,7 @@ require ( golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 golang.org/x/net v0.33.0 golang.org/x/oauth2 v0.24.0 + golang.org/x/sync v0.10.0 golang.org/x/text v0.21.0 google.golang.org/grpc v1.67.1 google.golang.org/protobuf v1.35.2 @@ -263,7 +264,6 @@ require ( go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/atomic v1.11.0 // indirect golang.org/x/mod v0.22.0 // indirect - golang.org/x/sync v0.10.0 // indirect golang.org/x/sys v0.29.0 // indirect golang.org/x/time v0.6.0 // indirect golang.org/x/tools v0.28.0 // indirect diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 970872b07d..c975258650 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -16,6 +16,8 @@ import ( "sync" "time" + "go.signoz.io/signoz/pkg/query-service/model/metrics_explorer" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/google/uuid" @@ -1141,7 +1143,7 @@ func (r *ClickHouseReader) GetUsage(ctx context.Context, queryParams *model.GetU func (r *ClickHouseReader) SearchTracesV2(ctx context.Context, params *model.SearchTracesParams, smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string, - levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) { + levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) { searchSpansResult := []model.SearchSpansResult{ { Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References", "Events", "HasError", "StatusMessage", "StatusCodeString", "SpanKind"}, @@ -1289,7 +1291,7 @@ func (r *ClickHouseReader) SearchTracesV2(ctx context.Context, params *model.Sea func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.SearchTracesParams, smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string, - levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) { + levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) { if r.useTraceNewSchema { return r.SearchTracesV2(ctx, params, smartTraceAlgorithm) @@ -5594,3 +5596,547 @@ func (r *ClickHouseReader) SubscribeToQueryProgress( ) (<-chan model.QueryProgress, func(), *model.ApiError) { return r.queryProgressTracker.SubscribeToQueryProgress(queryId) } + +func (r *ClickHouseReader) GetAllMetricFilterAttributeKeys(ctx context.Context, req *metrics_explorer.FilterKeyRequest, skipDotNames bool) (*[]v3.AttributeKey, *model.ApiError) { + var rows driver.Rows + var response []v3.AttributeKey + query := fmt.Sprintf("SELECT arrayJoin(tagKeys) AS distinctTagKey FROM (SELECT JSONExtractKeys(labels) AS tagKeys FROM %s.%s WHERE unix_milli >= $1 GROUP BY tagKeys) WHERE distinctTagKey ILIKE $2 AND distinctTagKey NOT LIKE '\\_\\_%%' GROUP BY distinctTagKey", signozMetricDBName, signozTSTableNameV41Day) + if req.Limit != 0 { + query = query + fmt.Sprintf(" LIMIT %d;", req.Limit) + } + rows, err := r.db.Query(ctx, query, common.PastDayRoundOff(), fmt.Sprintf("%%%s%%", req.SearchText)) + if err != nil { + zap.L().Error("Error while executing query", zap.Error(err)) + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + + var attributeKey string + for rows.Next() { + if err := rows.Scan(&attributeKey); err != nil { + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + if skipDotNames && strings.Contains(attributeKey, ".") { + continue + } + key := v3.AttributeKey{ + Key: attributeKey, + DataType: v3.AttributeKeyDataTypeString, // https://github.com/OpenObservability/OpenMetrics/blob/main/proto/openmetrics_data_model.proto#L64-L72. + Type: v3.AttributeKeyTypeTag, + IsColumn: false, + } + response = append(response, key) + } + return &response, nil +} + +func (r *ClickHouseReader) GetAllMetricFilterAttributeValues(ctx context.Context, req *metrics_explorer.FilterValueRequest) ([]string, *model.ApiError) { + var query string + var err error + var rows driver.Rows + var attributeValues []string + + query = fmt.Sprintf("SELECT JSONExtractString(labels, $1) AS tagValue FROM %s.%s WHERE JSONExtractString(labels, $2) ILIKE $3 AND unix_milli >= $4 GROUP BY tagValue", signozMetricDBName, signozTSTableNameV41Day) + if req.Limit != 0 { + query = query + fmt.Sprintf(" LIMIT %d;", req.Limit) + } + rows, err = r.db.Query(ctx, query, req.FilterKey, req.FilterKey, fmt.Sprintf("%%%s%%", req.SearchText), common.PastDayRoundOff()) + + if err != nil { + zap.L().Error("Error while executing query", zap.Error(err)) + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + defer rows.Close() + + var atrributeValue string + for rows.Next() { + if err := rows.Scan(&atrributeValue); err != nil { + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + attributeValues = append(attributeValues, atrributeValue) + } + return attributeValues, nil +} + +func (r *ClickHouseReader) GetAllMetricFilterUnits(ctx context.Context, req *metrics_explorer.FilterValueRequest) ([]string, *model.ApiError) { + var rows driver.Rows + var response []string + query := fmt.Sprintf("SELECT DISTINCT unit FROM %s.%s WHERE unit ILIKE $1 AND unit IS NOT NULL ORDER BY unit", signozMetricDBName, signozTSTableNameV41Day) + if req.Limit != 0 { + query = query + fmt.Sprintf(" LIMIT %d;", req.Limit) + } + + rows, err := r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText)) + if err != nil { + zap.L().Error("Error while executing query", zap.Error(err)) + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + + var attributeKey string + for rows.Next() { + if err := rows.Scan(&attributeKey); err != nil { + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + response = append(response, attributeKey) + } + return response, nil +} +func (r *ClickHouseReader) GetAllMetricFilterTypes(ctx context.Context, req *metrics_explorer.FilterValueRequest) ([]string, *model.ApiError) { + var rows driver.Rows + var response []string + query := fmt.Sprintf("SELECT DISTINCT type FROM %s.%s WHERE type ILIKE $1 AND type IS NOT NULL ORDER BY type", signozMetricDBName, signozTSTableNameV41Day) + if req.Limit != 0 { + query = query + fmt.Sprintf(" LIMIT %d;", req.Limit) + } + + rows, err := r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText)) + if err != nil { + zap.L().Error("Error while executing query", zap.Error(err)) + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + + var attributeKey string + for rows.Next() { + if err := rows.Scan(&attributeKey); err != nil { + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + response = append(response, attributeKey) + } + return response, nil +} + +func (r *ClickHouseReader) GetMetricsDataPointsAndLastReceived(ctx context.Context, metricName string) (uint64, uint64, *model.ApiError) { + query := fmt.Sprintf("SELECT COUNT(*) AS data_points, MAX(unix_milli) AS last_received_time FROM %s.%s WHERE metric_name = ?", signozMetricDBName, signozSampleTableName) + var lastRecievedTimestamp int64 // Changed from uint64 to int64 + var dataPoints uint64 + err := r.db.QueryRow(ctx, query, metricName).Scan(&dataPoints, &lastRecievedTimestamp) + if err != nil { + return 0, 0, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + return dataPoints, uint64(lastRecievedTimestamp), nil // Convert to uint64 before returning +} + +func (r *ClickHouseReader) GetTotalTimeSeriesForMetricName(ctx context.Context, metricName string) (uint64, *model.ApiError) { + query := fmt.Sprintf(`SELECT + uniq(fingerprint) AS timeSeriesCount +FROM %s.%s +WHERE metric_name = ?;`, signozMetricDBName, signozTSTableNameV41Week) + var timeSeriesCount uint64 + err := r.db.QueryRow(ctx, query, metricName).Scan(&timeSeriesCount) + if err != nil { + return 0, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + return timeSeriesCount, nil +} + +func (r *ClickHouseReader) GetAttributesForMetricName(ctx context.Context, metricName string) (*[]metrics_explorer.Attribute, *model.ApiError) { + query := fmt.Sprintf(` +SELECT + kv.1 AS key, + arrayMap(x -> trim(BOTH '\"' FROM x), groupUniqArray(10000)(kv.2)) AS values, + length(groupUniqArray(10000)(kv.2)) AS valueCount +FROM %s.%s +ARRAY JOIN arrayFilter(x -> NOT startsWith(x.1, '__'), JSONExtractKeysAndValuesRaw(labels)) AS kv +WHERE metric_name = ? +GROUP BY kv.1 +ORDER BY valueCount DESC; + `, signozMetricDBName, signozTSTableNameV41Week) + + rows, err := r.db.Query(ctx, query, metricName) + if err != nil { + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + defer rows.Close() // Ensure the rows are closed + + var attributesList []metrics_explorer.Attribute + for rows.Next() { + var key string + var values []string + var valueCount uint64 + + // Manually scan each value into its corresponding variable + if err := rows.Scan(&key, &values, &valueCount); err != nil { + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + + // Append the scanned values into the struct + attributesList = append(attributesList, metrics_explorer.Attribute{ + Key: key, + Value: values, + ValueCount: valueCount, + }) + } + + // Handle any errors encountered while scanning rows + if err := rows.Err(); err != nil { + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + + return &attributesList, nil +} + +func (r *ClickHouseReader) GetActiveTimeSeriesForMetricName(ctx context.Context, metricName string, duration time.Duration) (uint64, *model.ApiError) { + milli := time.Now().Add(-duration).UnixMilli() + query := fmt.Sprintf("SELECT uniq(fingerprint) FROM %s.%s WHERE metric_name = '%s' and unix_milli >= ?", signozMetricDBName, signozTSTableNameV4, metricName) + var timeSeries uint64 + // Using QueryRow instead of Select since we're only expecting a single value + err := r.db.QueryRow(ctx, query, milli).Scan(&timeSeries) + if err != nil { + return 0, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + return timeSeries, nil +} + +func (r *ClickHouseReader) ListSummaryMetrics(ctx context.Context, req *metrics_explorer.SummaryListMetricsRequest) (*metrics_explorer.SummaryListMetricsResponse, *model.ApiError) { + var args []interface{} + + conditions, _ := utils.BuildFilterConditions(&req.Filters, "t") + whereClause := "" + if conditions != nil { + whereClause = "AND " + strings.Join(conditions, " AND ") + } + + firstQueryLimit := req.Limit + dataPointsOrder := false + var orderByClauseFirstQuery string + if req.OrderBy.ColumnName == "samples" { + dataPointsOrder = true + orderByClauseFirstQuery = fmt.Sprintf("ORDER BY timeseries %s", req.OrderBy.Order) + if req.Limit < 50 { + firstQueryLimit = 50 + } + } else if req.OrderBy.ColumnName == "metric_type" { + orderByClauseFirstQuery = fmt.Sprintf("ORDER BY type %s", req.OrderBy.Order) + } else { + orderByClauseFirstQuery = fmt.Sprintf("ORDER BY %s %s", req.OrderBy.ColumnName, req.OrderBy.Order) + } + + start, end, tsTable, localTsTable := utils.WhichTSTableToUse(req.Start, req.EndD) + sampleTable, countExp := utils.WhichSampleTableToUse(req.Start, req.EndD) + + metricsQuery := fmt.Sprintf( + `SELECT + t.metric_name AS metric_name, + ANY_VALUE(t.description) AS description, + ANY_VALUE(t.type) AS type, + ANY_VALUE(t.unit), + uniq(t.fingerprint) AS timeseries, + uniq(metric_name) OVER() AS total + FROM %s.%s AS t + WHERE unix_milli BETWEEN ? AND ? + %s + GROUP BY t.metric_name + %s + LIMIT %d OFFSET %d;`, + signozMetricDBName, tsTable, whereClause, orderByClauseFirstQuery, firstQueryLimit, req.Offset) + + args = append(args, start, end) + valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads) + rows, err := r.db.Query(valueCtx, metricsQuery, args...) + if err != nil { + zap.L().Error("Error executing metrics query", zap.Error(err)) + return &metrics_explorer.SummaryListMetricsResponse{}, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + defer rows.Close() + + var response metrics_explorer.SummaryListMetricsResponse + var metricNames []string + + for rows.Next() { + var metric metrics_explorer.MetricDetail + if err := rows.Scan(&metric.MetricName, &metric.Description, &metric.Type, &metric.Unit, &metric.TimeSeries, &response.Total); err != nil { + zap.L().Error("Error scanning metric row", zap.Error(err)) + return &response, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + metricNames = append(metricNames, metric.MetricName) + response.Metrics = append(response.Metrics, metric) + } + if err := rows.Err(); err != nil { + zap.L().Error("Error iterating over metric rows", zap.Error(err)) + return &response, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + + if len(metricNames) == 0 { + return &response, nil + } + + metricsList := "'" + strings.Join(metricNames, "', '") + "'" + if dataPointsOrder { + orderByClauseFirstQuery = fmt.Sprintf("ORDER BY s.samples %s", req.OrderBy.Order) + } else { + orderByClauseFirstQuery = "" + } + + sampleQuery := fmt.Sprintf( + `SELECT + s.samples, + s.metric_name, + s.unix_milli AS lastReceived + FROM ( + SELECT + metric_name, + %s AS samples, + max(unix_milli) as unix_milli + FROM %s.%s + WHERE fingerprint IN ( + SELECT fingerprint + FROM %s.%s + WHERE unix_milli BETWEEN ? AND ? + %s + AND metric_name IN (%s) + GROUP BY fingerprint + ) + AND metric_name in (%s) + GROUP BY metric_name + ) AS s + %s + LIMIT %d OFFSET %d;`, + countExp, signozMetricDBName, sampleTable, signozMetricDBName, localTsTable, + whereClause, metricsList, metricsList, orderByClauseFirstQuery, + req.Limit, req.Offset) + + args = append(args, start, end) + rows, err = r.db.Query(valueCtx, sampleQuery, args...) + if err != nil { + zap.L().Error("Error executing samples query", zap.Error(err)) + return &response, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + defer rows.Close() + + samplesMap := make(map[string]uint64) + lastReceivedMap := make(map[string]int64) + + for rows.Next() { + var samples uint64 + var metricName string + var lastReceived int64 + if err := rows.Scan(&samples, &metricName, &lastReceived); err != nil { + zap.L().Error("Error scanning sample row", zap.Error(err)) + return &response, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + samplesMap[metricName] = samples + lastReceivedMap[metricName] = lastReceived + } + if err := rows.Err(); err != nil { + zap.L().Error("Error iterating over sample rows", zap.Error(err)) + return &response, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + + var filteredMetrics []metrics_explorer.MetricDetail + for i := range response.Metrics { + if samples, exists := samplesMap[response.Metrics[i].MetricName]; exists { + response.Metrics[i].Samples = samples + if lastReceived, exists := lastReceivedMap[response.Metrics[i].MetricName]; exists { + response.Metrics[i].LastReceived = lastReceived + } + filteredMetrics = append(filteredMetrics, response.Metrics[i]) + } + } + response.Metrics = filteredMetrics + + if dataPointsOrder { + sort.Slice(response.Metrics, func(i, j int) bool { + return response.Metrics[i].Samples > response.Metrics[j].Samples + }) + } + + return &response, nil +} + +func (r *ClickHouseReader) GetMetricsTimeSeriesPercentage(ctx context.Context, req *metrics_explorer.TreeMapMetricsRequest) (*[]metrics_explorer.TreeMapResponseItem, *model.ApiError) { + var args []interface{} + + // Build filters dynamically + conditions, _ := utils.BuildFilterConditions(&req.Filters, "") + whereClause := "" + if len(conditions) > 0 { + whereClause = "AND " + strings.Join(conditions, " AND ") + } + start, end, tsTable, _ := utils.WhichTSTableToUse(req.Start, req.EndD) + + // Construct the query without backticks + query := fmt.Sprintf(` + SELECT + metric_name, + total_value, + (total_value * 100.0 / total_time_series) AS percentage + FROM ( + SELECT + metric_name, + uniq(fingerprint) AS total_value, + (SELECT uniq(fingerprint) + FROM %s.%s + WHERE unix_milli BETWEEN ? AND ?) AS total_time_series + FROM %s.%s + WHERE unix_milli BETWEEN ? AND ? %s + GROUP BY metric_name + ) + ORDER BY percentage DESC + LIMIT %d;`, + signozMetricDBName, + tsTable, + signozMetricDBName, + tsTable, + whereClause, + req.Limit, + ) + + args = append(args, + start, end, // For total_cardinality subquery + start, end, // For main query + ) + + valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads) + rows, err := r.db.Query(valueCtx, query, args...) + if err != nil { + zap.L().Error("Error executing cardinality query", zap.Error(err), zap.String("query", query)) + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + defer rows.Close() + + var heatmap []metrics_explorer.TreeMapResponseItem + for rows.Next() { + var item metrics_explorer.TreeMapResponseItem + if err := rows.Scan(&item.MetricName, &item.TotalValue, &item.Percentage); err != nil { + zap.L().Error("Error scanning row", zap.Error(err)) + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + heatmap = append(heatmap, item) + } + + if err := rows.Err(); err != nil { + zap.L().Error("Error iterating over rows", zap.Error(err)) + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + + return &heatmap, nil +} + +func (r *ClickHouseReader) GetMetricsSamplesPercentage(ctx context.Context, req *metrics_explorer.TreeMapMetricsRequest) (*[]metrics_explorer.TreeMapResponseItem, *model.ApiError) { + var args []interface{} + + // Build the filter conditions + conditions, _ := utils.BuildFilterConditions(&req.Filters, "t") + whereClause := "" + if conditions != nil { + whereClause = "AND " + strings.Join(conditions, " AND ") + } + + // Determine time range and tables to use + start, end, tsTable, localTsTable := utils.WhichTSTableToUse(req.Start, req.EndD) + sampleTable, countExp := utils.WhichSampleTableToUse(req.Start, req.EndD) + + // Construct the metrics query + queryLimit := 50 + req.Limit + metricsQuery := fmt.Sprintf( + `SELECT + t.metric_name AS metric_name, + uniq(t.fingerprint) AS timeSeries + FROM %s.%s AS t + WHERE unix_milli BETWEEN ? AND ? + %s + GROUP BY t.metric_name + ORDER BY timeSeries DESC + LIMIT %d;`, + signozMetricDBName, tsTable, whereClause, queryLimit, + ) + + args = append(args, start, end) + valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads) + + // Execute the metrics query + rows, err := r.db.Query(valueCtx, metricsQuery, args...) + if err != nil { + zap.L().Error("Error executing metrics query", zap.Error(err)) + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + defer rows.Close() + + // Process the query results + var metricNames []string + for rows.Next() { + var metricName string + var timeSeries uint64 + if err := rows.Scan(&metricName, &timeSeries); err != nil { + zap.L().Error("Error scanning metric row", zap.Error(err)) + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + metricNames = append(metricNames, metricName) + } + if err := rows.Err(); err != nil { + zap.L().Error("Error iterating over metric rows", zap.Error(err)) + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + + // If no metrics found, return early + if len(metricNames) == 0 { + return nil, nil + } + + // Format metric names for query + metricsList := "'" + strings.Join(metricNames, "', '") + "'" + + // Construct the sample percentage query + sampleQuery := fmt.Sprintf( + `WITH TotalSamples AS ( + SELECT %s AS total_samples + FROM %s.%s + WHERE unix_milli BETWEEN ? AND ? + ) + SELECT + s.samples, + s.metric_name, + COALESCE((s.samples * 100.0 / t.total_samples), 0) AS percentage + FROM + ( + SELECT + metric_name, + %s AS samples + FROM %s.%s + WHERE fingerprint IN + ( + SELECT fingerprint + FROM %s.%s + WHERE unix_milli BETWEEN ? AND ? + %s + AND metric_name IN (%s) + GROUP BY fingerprint + ) + AND metric_name IN (%s) + GROUP BY metric_name + ) AS s + JOIN TotalSamples t ON 1 = 1 + ORDER BY percentage DESC + LIMIT %d;`, + countExp, signozMetricDBName, sampleTable, // Total samples + countExp, signozMetricDBName, sampleTable, // Inner select samples + signozMetricDBName, localTsTable, whereClause, metricsList, // Subquery conditions + metricsList, req.Limit, // Final conditions + ) + + args = append(args, start, end) + + // Execute the sample percentage query + rows, err = r.db.Query(valueCtx, sampleQuery, args...) + if err != nil { + zap.L().Error("Error executing samples query", zap.Error(err)) + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + defer rows.Close() + + // Process the results into a response slice + var heatmap []metrics_explorer.TreeMapResponseItem + for rows.Next() { + var item metrics_explorer.TreeMapResponseItem + if err := rows.Scan(&item.TotalValue, &item.MetricName, &item.Percentage); err != nil { + zap.L().Error("Error scanning row", zap.Error(err)) + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + heatmap = append(heatmap, item) + } + if err := rows.Err(); err != nil { + zap.L().Error("Error iterating over sample rows", zap.Error(err)) + return nil, &model.ApiError{Typ: "ClickHouseError", Err: err} + } + + return &heatmap, nil +} diff --git a/pkg/query-service/app/dashboards/model.go b/pkg/query-service/app/dashboards/model.go index 90c9943708..d25c43fe15 100644 --- a/pkg/query-service/app/dashboards/model.go +++ b/pkg/query-service/app/dashboards/model.go @@ -537,3 +537,87 @@ func countPanelsInDashboard(inputData map[string]interface{}) model.DashboardsIn LogsPanelsWithAttrContainsOp: logsPanelsWithAttrContains, } } + +func GetDashboardsWithMetricName(ctx context.Context, metricName string) ([]map[string]string, *model.ApiError) { + // Get all dashboards first + query := `SELECT uuid, data FROM dashboards` + + type dashboardRow struct { + Uuid string `db:"uuid"` + Data json.RawMessage `db:"data"` + } + + var dashboards []dashboardRow + err := db.Select(&dashboards, query) + if err != nil { + zap.L().Error("Error in getting dashboards", zap.Error(err)) + return nil, &model.ApiError{Typ: model.ErrorExec, Err: err} + } + + // Process the JSON data in Go + var result []map[string]string + for _, dashboard := range dashboards { + var dashData map[string]interface{} + if err := json.Unmarshal(dashboard.Data, &dashData); err != nil { + continue + } + + dashTitle, _ := dashData["title"].(string) + widgets, ok := dashData["widgets"].([]interface{}) + if !ok { + continue + } + + for _, w := range widgets { + widget, ok := w.(map[string]interface{}) + if !ok { + continue + } + + widgetTitle, _ := widget["title"].(string) + widgetID, _ := widget["id"].(string) + + query, ok := widget["query"].(map[string]interface{}) + if !ok { + continue + } + + builder, ok := query["builder"].(map[string]interface{}) + if !ok { + continue + } + + queryData, ok := builder["queryData"].([]interface{}) + if !ok { + continue + } + + for _, qd := range queryData { + data, ok := qd.(map[string]interface{}) + if !ok { + continue + } + + if dataSource, ok := data["dataSource"].(string); !ok || dataSource != "metrics" { + continue + } + + aggregateAttr, ok := data["aggregateAttribute"].(map[string]interface{}) + if !ok { + continue + } + + if key, ok := aggregateAttr["key"].(string); ok && strings.TrimSpace(key) == metricName { + result = append(result, map[string]string{ + "dashboard_id": dashboard.Uuid, + "widget_title": widgetTitle, + "widget_id": widgetID, + "dashboard_title": dashTitle, + }) + } + } + } + } + + return result, nil +} diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 0527886850..4a9a62b9b1 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "go.signoz.io/signoz/pkg/query-service/app/metricsexplorer" "io" "math" "net/http" @@ -127,6 +128,8 @@ type APIHandler struct { statefulsetsRepo *inframetrics.StatefulSetsRepo jobsRepo *inframetrics.JobsRepo + SummaryService *metricsexplorer.SummaryService + pvcsRepo *inframetrics.PvcsRepo JWT *authtypes.JWT @@ -215,6 +218,8 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { statefulsetsRepo := inframetrics.NewStatefulSetsRepo(opts.Reader, querierv2) jobsRepo := inframetrics.NewJobsRepo(opts.Reader, querierv2) pvcsRepo := inframetrics.NewPvcsRepo(opts.Reader, querierv2) + //explorerCache := metricsexplorer.NewExplorerCache(metricsexplorer.WithCache(opts.Cache)) + summaryService := metricsexplorer.NewSummaryService(opts.Reader, querierv2) aH := &APIHandler{ reader: opts.Reader, @@ -244,6 +249,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { jobsRepo: jobsRepo, pvcsRepo: pvcsRepo, JWT: opts.JWT, + SummaryService: summaryService, } logsQueryBuilder := logsv3.PrepareLogsQuery @@ -606,6 +612,24 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) { router.HandleFunc("/api/v1/changePassword/{id}", am.SelfAccess(aH.changePassword)).Methods(http.MethodPost) } +func (ah *APIHandler) MetricExplorerRoutes(router *mux.Router, am *AuthMiddleware) { + router.HandleFunc("/api/v1/metrics/filters/keys", + am.ViewAccess(ah.FilterKeysSuggestion)). + Methods(http.MethodGet) + router.HandleFunc("/api/v1/metrics/filters/values", + am.ViewAccess(ah.FilterValuesSuggestion)). + Methods(http.MethodPost) + router.HandleFunc("/api/v1/metrics/{metric_name}/metadata", + am.ViewAccess(ah.GetMetricsDetails)). + Methods(http.MethodGet) + router.HandleFunc("/api/v1/metrics", + am.ViewAccess(ah.ListMetrics)). + Methods(http.MethodPost) + router.HandleFunc("/api/v1/metrics/treemap", + am.ViewAccess(ah.GetTreeMap)). + Methods(http.MethodPost) +} + func Intersection(a, b []int) (c []int) { m := make(map[int]bool) diff --git a/pkg/query-service/app/metricsexplorer/parser.go b/pkg/query-service/app/metricsexplorer/parser.go new file mode 100644 index 0000000000..17e01c8f4c --- /dev/null +++ b/pkg/query-service/app/metricsexplorer/parser.go @@ -0,0 +1,70 @@ +package metricsexplorer + +import ( + "encoding/json" + "fmt" + "net/http" + "strconv" + + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + + "go.signoz.io/signoz/pkg/query-service/model" + "go.signoz.io/signoz/pkg/query-service/model/metrics_explorer" +) + +func ParseFilterKeySuggestions(r *http.Request) (*metrics_explorer.FilterKeyRequest, *model.ApiError) { + + searchText := r.URL.Query().Get("searchText") + limit, err := strconv.Atoi(r.URL.Query().Get("limit")) + if err != nil { + limit = 50 + } + + return &metrics_explorer.FilterKeyRequest{Limit: limit, SearchText: searchText}, nil +} + +func ParseFilterValueSuggestions(r *http.Request) (*metrics_explorer.FilterValueRequest, *model.ApiError) { + var filterValueRequest metrics_explorer.FilterValueRequest + + // parse the request body + if err := json.NewDecoder(r.Body).Decode(&filterValueRequest); err != nil { + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)} + } + + return &filterValueRequest, nil +} + +func ParseSummaryListMetricsParams(r *http.Request) (*metrics_explorer.SummaryListMetricsRequest, *model.ApiError) { + var listMetricsParams *metrics_explorer.SummaryListMetricsRequest + + // parse the request body + if err := json.NewDecoder(r.Body).Decode(&listMetricsParams); err != nil { + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)} + } + + if listMetricsParams.OrderBy.ColumnName == "" || listMetricsParams.OrderBy.Order == "" { + listMetricsParams.OrderBy.ColumnName = "timeseries" // DEFAULT ORDER BY + listMetricsParams.OrderBy.Order = v3.DirectionDesc + } + + if listMetricsParams.Limit == 0 { + listMetricsParams.Limit = 10 // DEFAULT LIMIT + } + + return listMetricsParams, nil +} + +func ParseTreeMapMetricsParams(r *http.Request) (*metrics_explorer.TreeMapMetricsRequest, *model.ApiError) { + var treeMapMetricParams *metrics_explorer.TreeMapMetricsRequest + + // parse the request body + if err := json.NewDecoder(r.Body).Decode(&treeMapMetricParams); err != nil { + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)} + } + + if treeMapMetricParams.Limit == 0 { + treeMapMetricParams.Limit = 10 + } + + return treeMapMetricParams, nil +} diff --git a/pkg/query-service/app/metricsexplorer/summary.go b/pkg/query-service/app/metricsexplorer/summary.go new file mode 100644 index 0000000000..f8f511f53f --- /dev/null +++ b/pkg/query-service/app/metricsexplorer/summary.go @@ -0,0 +1,209 @@ +package metricsexplorer + +import ( + "context" + "encoding/json" + "time" + + "go.uber.org/zap" + + "go.signoz.io/signoz/pkg/query-service/app/dashboards" + "go.signoz.io/signoz/pkg/query-service/interfaces" + "go.signoz.io/signoz/pkg/query-service/model" + "go.signoz.io/signoz/pkg/query-service/model/metrics_explorer" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "golang.org/x/sync/errgroup" +) + +type SummaryService struct { + reader interfaces.Reader + querierV2 interfaces.Querier +} + +func NewSummaryService(reader interfaces.Reader, querierV2 interfaces.Querier) *SummaryService { + return &SummaryService{reader: reader, querierV2: querierV2} +} + +func (receiver *SummaryService) FilterKeys(ctx context.Context, params *metrics_explorer.FilterKeyRequest) (*metrics_explorer.FilterKeyResponse, *model.ApiError) { + var response metrics_explorer.FilterKeyResponse + keys, apiError := receiver.reader.GetAllMetricFilterAttributeKeys( + ctx, + params, + true, + ) + if apiError != nil { + return nil, apiError + } + response.AttributeKeys = *keys + var availableColumnFilter []string + for key := range metrics_explorer.AvailableColumnFilterMap { + availableColumnFilter = append(availableColumnFilter, key) + } + response.MetricColumns = availableColumnFilter + return &response, nil +} + +func (receiver *SummaryService) FilterValues(ctx context.Context, params *metrics_explorer.FilterValueRequest) (*metrics_explorer.FilterValueResponse, *model.ApiError) { + var response metrics_explorer.FilterValueResponse + switch params.FilterKey { + case "metric_name": + var filterValues []string + request := v3.AggregateAttributeRequest{DataSource: v3.DataSourceMetrics, SearchText: params.SearchText, Limit: params.Limit} + attributes, err := receiver.reader.GetMetricAggregateAttributes(ctx, &request, true) + if err != nil { + return nil, model.InternalError(err) + } + for _, item := range attributes.AttributeKeys { + filterValues = append(filterValues, item.Key) + } + response.FilterValues = filterValues + return &response, nil + case "metric_unit": + attributes, err := receiver.reader.GetAllMetricFilterUnits(ctx, params) + if err != nil { + return nil, err + } + response.FilterValues = attributes + return &response, nil + case "metric_type": + attributes, err := receiver.reader.GetAllMetricFilterTypes(ctx, params) + if err != nil { + return nil, err + } + response.FilterValues = attributes + return &response, nil + default: + attributes, err := receiver.reader.GetAllMetricFilterAttributeValues(ctx, params) + if err != nil { + return nil, err + } + response.FilterValues = attributes + return &response, nil + } +} + +func (receiver *SummaryService) GetMetricsSummary(ctx context.Context, metricName string) (metrics_explorer.MetricDetailsDTO, *model.ApiError) { + var metricDetailsDTO metrics_explorer.MetricDetailsDTO + g, ctx := errgroup.WithContext(ctx) + + // Call 1: GetMetricMetadata + g.Go(func() error { + metadata, err := receiver.reader.GetMetricMetadata(ctx, metricName, metricName) + if err != nil { + return &model.ApiError{Typ: "ClickHouseError", Err: err} + } + metricDetailsDTO.Name = metricName + metricDetailsDTO.Unit = metadata.Unit + metricDetailsDTO.Description = metadata.Description + metricDetailsDTO.Type = metadata.Type + metricDetailsDTO.Metadata.MetricType = metadata.Type + metricDetailsDTO.Metadata.Description = metadata.Description + metricDetailsDTO.Metadata.Unit = metadata.Unit + return nil + }) + + // Call 2: GetMetricsDataPointsAndLastReceived + g.Go(func() error { + dataPoints, lastReceived, err := receiver.reader.GetMetricsDataPointsAndLastReceived(ctx, metricName) + if err != nil { + return err + } + metricDetailsDTO.Samples = dataPoints + metricDetailsDTO.LastReceived = lastReceived + return nil + }) + + // Call 3: GetTotalTimeSeriesForMetricName + g.Go(func() error { + totalSeries, err := receiver.reader.GetTotalTimeSeriesForMetricName(ctx, metricName) + if err != nil { + return err + } + metricDetailsDTO.TimeSeriesTotal = totalSeries + return nil + }) + + // Call 4: GetActiveTimeSeriesForMetricName + g.Go(func() error { + activeSeries, err := receiver.reader.GetActiveTimeSeriesForMetricName(ctx, metricName, 120*time.Minute) + if err != nil { + return err + } + metricDetailsDTO.TimeSeriesActive = activeSeries + return nil + }) + + // Call 5: GetAttributesForMetricName + g.Go(func() error { + attributes, err := receiver.reader.GetAttributesForMetricName(ctx, metricName) + if err != nil { + return err + } + if attributes != nil { + metricDetailsDTO.Attributes = *attributes + } + return nil + }) + + // Call 6: GetDashboardsWithMetricName + g.Go(func() error { + data, err := dashboards.GetDashboardsWithMetricName(ctx, metricName) + if err != nil { + return err + } + if data != nil { + jsonData, err := json.Marshal(data) + if err != nil { + zap.L().Error("Error marshalling data:", zap.Error(err)) + return &model.ApiError{Typ: "MarshallingErr", Err: err} + } + + var dashboards []metrics_explorer.Dashboard + err = json.Unmarshal(jsonData, &dashboards) + if err != nil { + zap.L().Error("Error unmarshalling data:", zap.Error(err)) + return &model.ApiError{Typ: "UnMarshallingErr", Err: err} + } + metricDetailsDTO.Dashboards = dashboards + } + return nil + }) + + // Wait for all goroutines and handle any errors + if err := g.Wait(); err != nil { + // Type assert to check if it's already an ApiError + if apiErr, ok := err.(*model.ApiError); ok { + return metrics_explorer.MetricDetailsDTO{}, apiErr + } + // If it's not an ApiError, wrap it in one + return metrics_explorer.MetricDetailsDTO{}, &model.ApiError{Typ: "InternalError", Err: err} + } + + return metricDetailsDTO, nil +} + +func (receiver *SummaryService) ListMetricsWithSummary(ctx context.Context, params *metrics_explorer.SummaryListMetricsRequest) (*metrics_explorer.SummaryListMetricsResponse, *model.ApiError) { + return receiver.reader.ListSummaryMetrics(ctx, params) +} + +func (receiver *SummaryService) GetMetricsTreemap(ctx context.Context, params *metrics_explorer.TreeMapMetricsRequest) (*metrics_explorer.TreeMap, *model.ApiError) { + var response metrics_explorer.TreeMap + switch params.Treemap { + case metrics_explorer.TimeSeriesTeeMap: + cardinality, apiError := receiver.reader.GetMetricsTimeSeriesPercentage(ctx, params) + if apiError != nil { + return nil, apiError + } + response.TimeSeries = *cardinality + return &response, nil + case metrics_explorer.SamplesTreeMap: + dataPoints, apiError := receiver.reader.GetMetricsSamplesPercentage(ctx, params) + if apiError != nil { + return nil, apiError + } + response.Samples = *dataPoints + return &response, nil + default: + return nil, nil + } +} diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index a394a1045b..b2e6a5ddf0 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -316,6 +316,7 @@ func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server, api.RegisterQueryRangeV4Routes(r, am) api.RegisterMessagingQueuesRoutes(r, am) api.RegisterThirdPartyApiRoutes(r, am) + api.MetricExplorerRoutes(r, am) c := cors.New(cors.Options{ AllowedOrigins: []string{"*"}, diff --git a/pkg/query-service/app/summary.go b/pkg/query-service/app/summary.go new file mode 100644 index 0000000000..279738e538 --- /dev/null +++ b/pkg/query-service/app/summary.go @@ -0,0 +1,104 @@ +package app + +import ( + "bytes" + "io" + "net/http" + + "github.com/gorilla/mux" + "go.signoz.io/signoz/pkg/query-service/model" + + explorer "go.signoz.io/signoz/pkg/query-service/app/metricsexplorer" + "go.uber.org/zap" +) + +func (aH *APIHandler) FilterKeysSuggestion(w http.ResponseWriter, r *http.Request) { + bodyBytes, _ := io.ReadAll(r.Body) + r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) + ctx := r.Context() + params, apiError := explorer.ParseFilterKeySuggestions(r) + if apiError != nil { + zap.L().Error("error parsing summary filter keys request", zap.Error(apiError.Err)) + RespondError(w, apiError, nil) + return + } + keys, apiError := aH.SummaryService.FilterKeys(ctx, params) + if apiError != nil { + zap.L().Error("error getting filter keys", zap.Error(apiError.Err)) + RespondError(w, apiError, nil) + return + } + aH.Respond(w, keys) +} + +func (aH *APIHandler) FilterValuesSuggestion(w http.ResponseWriter, r *http.Request) { + bodyBytes, _ := io.ReadAll(r.Body) + r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) + ctx := r.Context() + params, apiError := explorer.ParseFilterValueSuggestions(r) + if apiError != nil { + zap.L().Error("error parsing summary filter values request", zap.Error(apiError.Err)) + RespondError(w, apiError, nil) + return + } + + values, apiError := aH.SummaryService.FilterValues(ctx, params) + if apiError != nil { + zap.L().Error("error getting filter values", zap.Error(apiError.Err)) + RespondError(w, apiError, nil) + return + } + aH.Respond(w, values) +} + +func (aH *APIHandler) GetMetricsDetails(w http.ResponseWriter, r *http.Request) { + metricName := mux.Vars(r)["metric_name"] + ctx := r.Context() + metricsDetail, apiError := aH.SummaryService.GetMetricsSummary(ctx, metricName) + if apiError != nil { + zap.L().Error("error parsing metric query range params", zap.Error(apiError.Err)) + RespondError(w, apiError, nil) + return + } + aH.Respond(w, metricsDetail) +} + +func (aH *APIHandler) ListMetrics(w http.ResponseWriter, r *http.Request) { + bodyBytes, _ := io.ReadAll(r.Body) + r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) + ctx := r.Context() + params, apiError := explorer.ParseSummaryListMetricsParams(r) + if apiError != nil { + zap.L().Error("error parsing metric list metric summary api request", zap.Error(apiError.Err)) + RespondError(w, model.BadRequest(apiError), nil) + return + } + + slmr, apiErr := aH.SummaryService.ListMetricsWithSummary(ctx, params) + if apiErr != nil { + zap.L().Error("error parsing metric query range params", zap.Error(apiErr.Err)) + RespondError(w, apiError, nil) + return + } + aH.Respond(w, slmr) +} + +func (aH *APIHandler) GetTreeMap(w http.ResponseWriter, r *http.Request) { + bodyBytes, _ := io.ReadAll(r.Body) + r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) + ctx := r.Context() + params, apiError := explorer.ParseTreeMapMetricsParams(r) + if apiError != nil { + zap.L().Error("error parsing metric query range params", zap.Error(apiError.Err)) + RespondError(w, apiError, nil) + return + } + result, apiError := aH.SummaryService.GetMetricsTreemap(ctx, params) + if apiError != nil { + zap.L().Error("error getting heatmap data", zap.Error(apiError.Err)) + RespondError(w, apiError, nil) + return + } + aH.Respond(w, result) + +} diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index a85cf59543..b56f6f0024 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -85,6 +85,8 @@ var TimestampSortFeature = GetOrDefaultEnv("TIMESTAMP_SORT_FEATURE", "true") var PreferRPMFeature = GetOrDefaultEnv("PREFER_RPM_FEATURE", "false") +var MetricsExplorerClickhouseThreads = GetOrDefaultEnvInt("METRICS_EXPLORER_CLICKHOUSE_THREADS", 8) + // TODO(srikanthccv): remove after backfilling is done func UseMetricsPreAggregation() bool { return GetOrDefaultEnv("USE_METRICS_PRE_AGGREGATION", "true") == "true" @@ -231,6 +233,9 @@ const ( SIGNOZ_TIMESERIES_v4_1WEEK_LOCAL_TABLENAME = "time_series_v4_1week" SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME = "distributed_time_series_v4_1day" SIGNOZ_TOP_LEVEL_OPERATIONS_TABLENAME = "distributed_top_level_operations" + SIGNOZ_TIMESERIES_v4_TABLENAME = "distributed_time_series_v4" + SIGNOZ_TIMESERIES_v4_1WEEK_TABLENAME = "distributed_time_series_v4_1week" + SIGNOZ_TIMESERIES_v4_6HRS_TABLENAME = "distributed_time_series_v4_6hrs" ) // alert related constants diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index afb1e9f728..db2204f637 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -8,6 +8,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/stats" "go.signoz.io/signoz/pkg/query-service/model" + "go.signoz.io/signoz/pkg/query-service/model/metrics_explorer" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/querycache" ) @@ -115,6 +116,21 @@ type Reader interface { //trace GetTraceFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError) UpdateTraceField(ctx context.Context, field *model.UpdateField) *model.ApiError + + GetAllMetricFilterAttributeValues(ctx context.Context, req *metrics_explorer.FilterValueRequest) ([]string, *model.ApiError) + GetAllMetricFilterUnits(ctx context.Context, req *metrics_explorer.FilterValueRequest) ([]string, *model.ApiError) + GetAllMetricFilterTypes(ctx context.Context, req *metrics_explorer.FilterValueRequest) ([]string, *model.ApiError) + GetAllMetricFilterAttributeKeys(ctx context.Context, req *metrics_explorer.FilterKeyRequest, skipDotNames bool) (*[]v3.AttributeKey, *model.ApiError) + + GetMetricsDataPointsAndLastReceived(ctx context.Context, metricName string) (uint64, uint64, *model.ApiError) + GetTotalTimeSeriesForMetricName(ctx context.Context, metricName string) (uint64, *model.ApiError) + GetActiveTimeSeriesForMetricName(ctx context.Context, metricName string, duration time.Duration) (uint64, *model.ApiError) + GetAttributesForMetricName(ctx context.Context, metricName string) (*[]metrics_explorer.Attribute, *model.ApiError) + + ListSummaryMetrics(ctx context.Context, req *metrics_explorer.SummaryListMetricsRequest) (*metrics_explorer.SummaryListMetricsResponse, *model.ApiError) + + GetMetricsTimeSeriesPercentage(ctx context.Context, request *metrics_explorer.TreeMapMetricsRequest) (*[]metrics_explorer.TreeMapResponseItem, *model.ApiError) + GetMetricsSamplesPercentage(ctx context.Context, req *metrics_explorer.TreeMapMetricsRequest) (*[]metrics_explorer.TreeMapResponseItem, *model.ApiError) } type Querier interface { diff --git a/pkg/query-service/model/metrics_explorer/summary.go b/pkg/query-service/model/metrics_explorer/summary.go new file mode 100644 index 0000000000..2705bd8b5f --- /dev/null +++ b/pkg/query-service/model/metrics_explorer/summary.go @@ -0,0 +1,124 @@ +package metrics_explorer + +import ( + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +type SummaryListMetricsRequest struct { + Offset int `json:"offset"` + Limit int `json:"limit"` + OrderBy v3.OrderBy `json:"orderBy"` + Start int64 `json:"start"` + EndD int64 `json:"end"` + Filters v3.FilterSet `json:"filters"` +} + +type TreeMapType string + +const ( + TimeSeriesTeeMap TreeMapType = "timeseries" + SamplesTreeMap TreeMapType = "samples" +) + +type TreeMapMetricsRequest struct { + Limit int `json:"limit"` + Treemap TreeMapType `json:"treemap"` + Start int64 `json:"start"` + EndD int64 `json:"end"` + Filters v3.FilterSet `json:"filters"` +} + +type MetricDetail struct { + MetricName string `json:"metric_name"` + Description string `json:"description"` + Type string `json:"type"` + Unit string `json:"unit"` + TimeSeries uint64 `json:"timeseries"` + Samples uint64 `json:"samples"` + LastReceived int64 `json:"lastReceived"` +} + +type TreeMapResponseItem struct { + Percentage float64 `json:"percentage"` + TotalValue uint64 `json:"total_value"` + MetricName string `json:"metric_name"` +} + +type TreeMap struct { + TimeSeries []TreeMapResponseItem `json:"timeseries"` + Samples []TreeMapResponseItem `json:"samples"` +} + +type SummaryListMetricsResponse struct { + Metrics []MetricDetail `json:"metrics"` + Total uint64 `json:"total"` +} + +type Attribute struct { + Key string `json:"key" db:"key"` + Value []string `json:"value" db:"value"` + ValueCount uint64 `json:"valueCount" db:"valueCount"` +} + +// Metadata holds additional information about the metric. +type Metadata struct { + MetricType string `json:"metric_type"` + Description string `json:"description"` + Unit string `json:"unit"` +} + +// Alert represents individual alerts associated with the metric. +type Alert struct { + AlertName string `json:"alert_name"` + AlertID string `json:"alert_id"` +} + +// Dashboard represents individual dashboards associated with the metric. +type Dashboard struct { + DashboardName string `json:"dashboard_name"` + DashboardID string `json:"dashboard_id"` + WidgetID string `json:"widget_id"` + WidgetName string `json:"widget_name"` +} + +type MetricDetailsDTO struct { + Name string `json:"name"` + Description string `json:"description"` + Type string `json:"type"` + Unit string `json:"unit"` + Samples uint64 `json:"samples"` + TimeSeriesTotal uint64 `json:"timeSeriesTotal"` + TimeSeriesActive uint64 `json:"timeSeriesActive"` + LastReceived uint64 `json:"lastReceived"` + Attributes []Attribute `json:"attributes"` + Metadata Metadata `json:"metadata"` + Alerts []Alert `json:"alerts"` + Dashboards []Dashboard `json:"dashboards"` +} + +type FilterKeyRequest struct { + SearchText string `json:"searchText"` + Limit int `json:"limit"` +} + +type FilterValueRequest struct { + FilterKey string `json:"filterKey"` + FilterAttributeKeyDataType v3.AttributeKeyDataType `json:"filterAttributeKeyDataType"` + SearchText string `json:"searchText"` + Limit int `json:"limit"` +} + +type FilterValueResponse struct { + FilterValues []string `json:"filterValues"` +} + +type FilterKeyResponse struct { + MetricColumns []string `json:"metricColumns"` + AttributeKeys []v3.AttributeKey `json:"attributeKeys"` +} + +var AvailableColumnFilterMap = map[string]bool{ + "metric_name": true, + "metric_unit": true, + "metric_type": true, +} diff --git a/pkg/query-service/utils/filter_conditions.go b/pkg/query-service/utils/filter_conditions.go new file mode 100644 index 0000000000..5b496320f4 --- /dev/null +++ b/pkg/query-service/utils/filter_conditions.go @@ -0,0 +1,153 @@ +package utils + +import ( + "fmt" + "strings" + "time" + + "go.signoz.io/signoz/pkg/query-service/constants" + "go.signoz.io/signoz/pkg/query-service/model/metrics_explorer" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +// skipKey is an optional parameter to skip processing of a specific key +func BuildFilterConditions(fs *v3.FilterSet, skipKey string) ([]string, error) { + if fs == nil || len(fs.Items) == 0 { + return nil, nil + } + + var conditions []string + + for _, item := range fs.Items { + if skipKey != "" && item.Key.Key == skipKey { + continue + } + + toFormat := item.Value + op := v3.FilterOperator(strings.ToLower(strings.TrimSpace(string(item.Operator)))) + if op == v3.FilterOperatorContains || op == v3.FilterOperatorNotContains { + toFormat = fmt.Sprintf("%%%s%%", toFormat) + } + fmtVal := ClickHouseFormattedValue(toFormat) + + // Determine if the key is a JSON key or a normal column + isJSONKey := false + if _, exists := metrics_explorer.AvailableColumnFilterMap[item.Key.Key]; exists { + isJSONKey = false + } else { + isJSONKey = true + } + + condition, err := buildSingleFilterCondition(item.Key.Key, op, fmtVal, isJSONKey) + if err != nil { + return nil, err + } + conditions = append(conditions, condition) + } + + return conditions, nil +} + +func buildSingleFilterCondition(key string, op v3.FilterOperator, fmtVal string, isJSONKey bool) (string, error) { + var keyCondition string + if isJSONKey { + keyCondition = fmt.Sprintf("JSONExtractString(labels, '%s')", key) + } else { // Assuming normal column access + if key == "metric_unit" { + key = "unit" + } + if key == "metric_type" { + key = "type" + } + keyCondition = key + } + + switch op { + case v3.FilterOperatorEqual: + return fmt.Sprintf("%s = %s", keyCondition, fmtVal), nil + case v3.FilterOperatorNotEqual: + return fmt.Sprintf("%s != %s", keyCondition, fmtVal), nil + case v3.FilterOperatorIn: + return fmt.Sprintf("%s IN %s", keyCondition, fmtVal), nil + case v3.FilterOperatorNotIn: + return fmt.Sprintf("%s NOT IN %s", keyCondition, fmtVal), nil + case v3.FilterOperatorLike: + return fmt.Sprintf("like(%s, %s)", keyCondition, fmtVal), nil + case v3.FilterOperatorNotLike: + return fmt.Sprintf("notLike(%s, %s)", keyCondition, fmtVal), nil + case v3.FilterOperatorRegex: + return fmt.Sprintf("match(%s, %s)", keyCondition, fmtVal), nil + case v3.FilterOperatorNotRegex: + return fmt.Sprintf("not match(%s, %s)", keyCondition, fmtVal), nil + case v3.FilterOperatorGreaterThan: + return fmt.Sprintf("%s > %s", keyCondition, fmtVal), nil + case v3.FilterOperatorGreaterThanOrEq: + return fmt.Sprintf("%s >= %s", keyCondition, fmtVal), nil + case v3.FilterOperatorLessThan: + return fmt.Sprintf("%s < %s", keyCondition, fmtVal), nil + case v3.FilterOperatorLessThanOrEq: + return fmt.Sprintf("%s <= %s", keyCondition, fmtVal), nil + case v3.FilterOperatorContains: + return fmt.Sprintf("like(%s, %s)", keyCondition, fmtVal), nil + case v3.FilterOperatorNotContains: + return fmt.Sprintf("notLike(%s, %s)", keyCondition, fmtVal), nil + case v3.FilterOperatorExists: + return fmt.Sprintf("has(JSONExtractKeys(labels), '%s')", key), nil + case v3.FilterOperatorNotExists: + return fmt.Sprintf("not has(JSONExtractKeys(labels), '%s')", key), nil + default: + return "", fmt.Errorf("unsupported filter operator: %s", op) + } +} + +var ( + sixHoursInMilliseconds = time.Hour.Milliseconds() * 6 + oneDayInMilliseconds = time.Hour.Milliseconds() * 24 + oneWeekInMilliseconds = oneDayInMilliseconds * 7 +) + +func WhichTSTableToUse(start, end int64) (int64, int64, string, string) { + + var tableName string + var localTableName string + if end-start < sixHoursInMilliseconds { + // adjust the start time to nearest 1 hour + start = start - (start % (time.Hour.Milliseconds() * 1)) + tableName = constants.SIGNOZ_TIMESERIES_v4_TABLENAME + localTableName = constants.SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME + } else if end-start < oneDayInMilliseconds { + // adjust the start time to nearest 6 hours + start = start - (start % (time.Hour.Milliseconds() * 6)) + tableName = constants.SIGNOZ_TIMESERIES_v4_6HRS_TABLENAME + localTableName = constants.SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME + } else if end-start < oneWeekInMilliseconds { + // adjust the start time to nearest 1 day + start = start - (start % (time.Hour.Milliseconds() * 24)) + tableName = constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME + localTableName = constants.SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME + } else { + if constants.UseMetricsPreAggregation() { + // adjust the start time to nearest 1 week + start = start - (start % (time.Hour.Milliseconds() * 24 * 7)) + tableName = constants.SIGNOZ_TIMESERIES_v4_1WEEK_TABLENAME + localTableName = constants.SIGNOZ_TIMESERIES_v4_1WEEK_LOCAL_TABLENAME + } else { + // continue to use the 1 day table + start = start - (start % (time.Hour.Milliseconds() * 24)) + tableName = constants.SIGNOZ_TIMESERIES_v4_1DAY_TABLENAME + localTableName = constants.SIGNOZ_TIMESERIES_v4_1DAY_LOCAL_TABLENAME + } + } + + return start, end, tableName, localTableName +} + +func WhichSampleTableToUse(start, end int64) (string, string) { + if end-start < oneDayInMilliseconds { + return constants.SIGNOZ_SAMPLES_V4_TABLENAME, "count(*)" + } else if end-start < oneWeekInMilliseconds { + return constants.SIGNOZ_SAMPLES_V4_AGG_5M_TABLENAME, "sum(count)" + } else { + return constants.SIGNOZ_SAMPLES_V4_AGG_30M_TABLENAME, "sum(count)" + } +} diff --git a/pkg/telemetrystore/telemetrystorehook/settings.go b/pkg/telemetrystore/telemetrystorehook/settings.go index 834baf56a2..c97ac81d4b 100644 --- a/pkg/telemetrystore/telemetrystorehook/settings.go +++ b/pkg/telemetrystore/telemetrystorehook/settings.go @@ -63,6 +63,10 @@ func (h *provider) clickHouseSettings(ctx context.Context, query string, args .. settings["timeout_before_checking_execution_speed"] = h.settings.TimeoutBeforeCheckingExecutionSpeed } + if ctx.Value("clickhouse_max_threads") != nil { + if maxThreads, ok := ctx.Value("clickhouse_max_threads").(int); ok { settings["max_threads"] = maxThreads } + } + ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings)) return ctx, query, args }