[feat] Add overview APIs for Celery and Kafka in messaging queues integration (#6756)

* feat: added queue overview api for generic messaging system
Signed-off-by: Shivanshu Raj Shrivastava <shivanshu1333@gmail.com>
This commit is contained in:
Shivanshu Raj Shrivastava 2025-01-08 12:20:11 +05:30 committed by GitHub
parent 80740f646c
commit 505757b971
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 327 additions and 38 deletions

View File

@ -2501,32 +2501,57 @@ func (aH *APIHandler) WriteJSON(w http.ResponseWriter, r *http.Request, response
// RegisterMessagingQueuesRoutes adds messaging-queues routes
func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *AuthMiddleware) {
// SubRouter for kafka
kafkaRouter := router.PathPrefix("/api/v1/messaging-queues/kafka").Subrouter()
// Main messaging queues router
messagingQueuesRouter := router.PathPrefix("/api/v1/messaging-queues").Subrouter()
// Queue Overview route
messagingQueuesRouter.HandleFunc("/queue-overview", am.ViewAccess(aH.getQueueOverview)).Methods(http.MethodPost)
// -------------------------------------------------
// Kafka-specific routes
kafkaRouter := messagingQueuesRouter.PathPrefix("/kafka").Subrouter()
onboardingRouter := kafkaRouter.PathPrefix("/onboarding").Subrouter()
onboardingRouter.HandleFunc("/producers", am.ViewAccess(aH.onboardProducers)).Methods(http.MethodPost)
onboardingRouter.HandleFunc("/consumers", am.ViewAccess(aH.onboardConsumers)).Methods(http.MethodPost)
onboardingRouter.HandleFunc("/kafka", am.ViewAccess(aH.onboardKafka)).Methods(http.MethodPost)
partitionLatency := kafkaRouter.PathPrefix("/partition-latency").Subrouter()
partitionLatency.HandleFunc("/overview", am.ViewAccess(aH.getPartitionOverviewLatencyData)).Methods(http.MethodPost)
partitionLatency.HandleFunc("/consumer", am.ViewAccess(aH.getConsumerPartitionLatencyData)).Methods(http.MethodPost)
consumerLagRouter := kafkaRouter.PathPrefix("/consumer-lag").Subrouter()
consumerLagRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost)
consumerLagRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost)
consumerLagRouter.HandleFunc("/network-latency", am.ViewAccess(aH.getNetworkData)).Methods(http.MethodPost)
topicThroughput := kafkaRouter.PathPrefix("/topic-throughput").Subrouter()
topicThroughput.HandleFunc("/producer", am.ViewAccess(aH.getProducerThroughputOverview)).Methods(http.MethodPost)
topicThroughput.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerThroughputDetails)).Methods(http.MethodPost)
topicThroughput.HandleFunc("/consumer", am.ViewAccess(aH.getConsumerThroughputOverview)).Methods(http.MethodPost)
topicThroughput.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerThroughputDetails)).Methods(http.MethodPost)
spanEvaluation := kafkaRouter.PathPrefix("/span").Subrouter()
spanEvaluation.HandleFunc("/evaluation", am.ViewAccess(aH.getProducerConsumerEval)).Methods(http.MethodPost)
// -------------------------------------------------
// Celery-specific routes
celeryRouter := messagingQueuesRouter.PathPrefix("/celery").Subrouter()
// Celery overview routes
celeryRouter.HandleFunc("/overview", am.ViewAccess(aH.getCeleryOverview)).Methods(http.MethodPost)
// Celery tasks routes
celeryRouter.HandleFunc("/tasks", am.ViewAccess(aH.getCeleryTasks)).Methods(http.MethodPost)
// Celery performance routes
celeryRouter.HandleFunc("/performance", am.ViewAccess(aH.getCeleryPerformance)).Methods(http.MethodPost)
// for other messaging queues, add SubRouters here
}
@ -4196,7 +4221,7 @@ func (aH *APIHandler) autocompleteAggregateAttributes(w http.ResponseWriter, r *
switch req.DataSource {
case v3.DataSourceMetrics:
response, err = aH.reader.GetMetricAggregateAttributes(r.Context(), req, false)
response, err = aH.reader.GetMetricAggregateAttributes(r.Context(), req, true)
case v3.DataSourceLogs:
response, err = aH.reader.GetLogAggregateAttributes(r.Context(), req)
case v3.DataSourceTraces:
@ -4936,3 +4961,39 @@ func (aH *APIHandler) updateTraceField(w http.ResponseWriter, r *http.Request) {
}
aH.WriteJSON(w, r, field)
}
func (aH *APIHandler) getQueueOverview(w http.ResponseWriter, r *http.Request) {
// ToDo: add capability of dynamic filtering based on any of the filters using QueueFilters
messagingQueue, apiErr := ParseMessagingQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
RespondError(w, apiErr, nil)
return
}
chq, err := mq.BuildClickHouseQuery(messagingQueue, "", "overview")
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
aH.Respond(w, results)
}
func (aH *APIHandler) getCeleryOverview(w http.ResponseWriter, r *http.Request) {
// TODO: Implement celery overview logic for both worker and tasks types
}
func (aH *APIHandler) getCeleryTasks(w http.ResponseWriter, r *http.Request) {
// TODO: Implement celery tasks logic for both state and list types
}
func (aH *APIHandler) getCeleryPerformance(w http.ResponseWriter, r *http.Request) {
// TODO: Implement celery performance logic for error, rate, and latency types
}

View File

@ -22,3 +22,37 @@ type OnboardingResponse struct {
Message string `json:"error_message"`
Status string `json:"status"`
}
// QueueFilters
// ToDo: add capability of dynamic filtering based on any of the filters
type QueueFilters struct {
ServiceName []string
SpanName []string
Queue []string
Destination []string
Kind []string
}
type CeleryTask struct {
kind string
status string
}
type CeleryTasks interface {
GetKind() string
GetStatus() string
Set(string, string)
}
func (r *CeleryTask) GetKind() string {
return r.kind
}
func (r *CeleryTask) GetStatus() string {
return r.status
}
func (r *CeleryTask) Set(kind, status string) {
r.kind = kind
r.status = status
}

View File

@ -2,6 +2,7 @@ package kafka
import (
"fmt"
"strings"
)
func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queueType string) string {
@ -318,6 +319,139 @@ GROUP BY
return query
}
// generateOverviewSQL builds the ClickHouse SQL query with optional filters.
// If a filter slice is empty, the query does not constrain on that field.
func generateOverviewSQL(start, end int64, filters *QueueFilters) string {
// Convert from nanoseconds to float seconds in Go to avoid decimal overflow in ClickHouse
startSeconds := float64(start) / 1e9
endSeconds := float64(end) / 1e9
// Compute time range difference in Go
timeRangeSecs := endSeconds - startSeconds
// Example ts_bucket boundaries (could be your own logic)
tsBucketStart := startSeconds - 1800
tsBucketEnd := endSeconds
// Build WHERE clauses for optional filters
// We always require messaging_system IN ('kafka', 'celery'), but
// we add additional AND conditions only if the slices are non-empty.
var whereClauses []string
// Mandatory base filter: show only kafka/celery
whereClauses = append(whereClauses, "messaging_system IN ('kafka', 'celery')")
if len(filters.ServiceName) > 0 {
whereClauses = append(whereClauses, inClause("service_name", filters.ServiceName))
}
if len(filters.SpanName) > 0 {
whereClauses = append(whereClauses, inClause("span_name", filters.SpanName))
}
if len(filters.Queue) > 0 {
// "queue" in the struct refers to the messaging_system in the DB
whereClauses = append(whereClauses, inClause("messaging_system", filters.Queue))
}
if len(filters.Destination) > 0 {
whereClauses = append(whereClauses, inClause("destination", filters.Destination))
}
if len(filters.Kind) > 0 {
whereClauses = append(whereClauses, inClause("kind_string", filters.Kind))
}
// Combine all WHERE clauses with AND
whereSQL := strings.Join(whereClauses, "\n AND ")
if len(whereSQL) > 0 {
whereSQL = fmt.Sprintf("AND %s", whereSQL)
}
// Final query string
// Note the use of %f for float64 values in fmt.Sprintf
query := fmt.Sprintf(`
WITH
processed_traces AS (
SELECT
resource_string_service$$name AS service_name,
name AS span_name,
CASE
WHEN attribute_string_messaging$$system != '' THEN attribute_string_messaging$$system
WHEN (has(attributes_string, 'celery.action') OR has(attributes_string, 'celery.task_name')) THEN 'celery'
ELSE 'undefined'
END AS messaging_system,
kind_string,
COALESCE(
NULLIF(attributes_string['messaging.destination.name'], ''),
NULLIF(attributes_string['messaging.destination'], '')
) AS destination,
durationNano,
status_code
FROM signoz_traces.distributed_signoz_index_v3
WHERE
timestamp >= toDateTime64(%f, 9)
AND timestamp <= toDateTime64(%f, 9)
AND ts_bucket_start >= toDateTime64(%f, 9)
AND ts_bucket_start <= toDateTime64(%f, 9)
AND (
attribute_string_messaging$$system = 'kafka'
OR has(attributes_string, 'celery.action')
OR has(attributes_string, 'celery.task_name')
)
%s
),
aggregated_metrics AS (
SELECT
service_name,
span_name,
messaging_system,
destination,
kind_string,
count(*) AS total_count,
sumIf(1, status_code = 2) AS error_count,
quantile(0.95)(durationNano) / 1000000 AS p95_latency -- Convert to ms
FROM
processed_traces
GROUP BY
service_name,
span_name,
messaging_system,
destination,
kind_string
)
SELECT
aggregated_metrics.service_name,
aggregated_metrics.span_name,
aggregated_metrics.messaging_system,
aggregated_metrics.destination,
aggregated_metrics.kind_string,
COALESCE(aggregated_metrics.total_count / %f, 0) AS throughput,
COALESCE((aggregated_metrics.error_count * 100.0) / aggregated_metrics.total_count, 0) AS error_percentage,
aggregated_metrics.p95_latency
FROM
aggregated_metrics
ORDER BY
aggregated_metrics.service_name,
aggregated_metrics.span_name;
`,
startSeconds, endSeconds,
tsBucketStart, tsBucketEnd,
whereSQL, timeRangeSecs,
)
return query
}
// inClause returns SQL like "fieldName IN ('val1','val2','val3')"
func inClause(fieldName string, values []string) string {
// Quote and escape each value for safety
var quoted []string
for _, v := range values {
// Simple escape: replace any single quotes in v
safeVal := strings.ReplaceAll(v, "'", "''")
quoted = append(quoted, fmt.Sprintf("'%s'", safeVal))
}
return fmt.Sprintf("%s IN (%s)", fieldName, strings.Join(quoted, ","))
}
func generateProducerSQL(start, end int64, topic, partition, queueType string) string {
timeRange := (end - start) / 1000000000
tsBucketStart := (start / 1000000000) - 1800

View File

@ -2,9 +2,11 @@ package kafka
import (
"fmt"
"go.signoz.io/signoz/pkg/query-service/common"
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"strings"
)
var defaultStepInterval int64 = 60
@ -14,18 +16,19 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string)
if constants.KafkaSpanEval == "false" && queryContext == "producer-consumer-eval" {
return nil, fmt.Errorf("span evaluation feature is disabled and is experimental")
}
// ToDo: propagate this through APIs when there are different handlers
queueType := KafkaQueue
chq, err := BuildClickHouseQuery(messagingQueue, queueType, queryContext)
if err != nil {
return nil, err
}
var cq *v3.CompositeQuery
cq, err = buildCompositeQuery(chq, queryContext)
cq, err := buildCompositeQuery(chq, queryContext)
if err != nil {
return nil, err
}
queryRangeParams := &v3.QueryRangeParamsV3{
Start: messagingQueue.Start,
@ -42,6 +45,7 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string)
func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) {
start := messagingQueue.Start
end := messagingQueue.End
consumerGroup, ok := messagingQueue.Variables["consumer_group"]
if !ok {
return nil, fmt.Errorf("consumer_group not found in the request")
@ -53,15 +57,18 @@ func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType strin
}
query := generateNetworkLatencyThroughputSQL(start, end, consumerGroup, partitionID, queueType)
return &v3.ClickHouseQuery{
Query: query,
}, nil
}
func buildBuilderQueriesProducerBytes(unixMilliStart, unixMilliEnd int64, attributeCache *Clients) (map[string]*v3.BuilderQuery, error) {
func buildBuilderQueriesProducerBytes(
unixMilliStart, unixMilliEnd int64,
attributeCache *Clients,
) (map[string]*v3.BuilderQuery, error) {
bq := make(map[string]*v3.BuilderQuery)
queryName := fmt.Sprintf("byte_rate")
queryName := "byte_rate"
chq := &v3.BuilderQuery{
QueryName: queryName,
@ -102,11 +109,12 @@ func buildBuilderQueriesProducerBytes(unixMilliStart, unixMilliEnd int64, attrib
},
Expression: queryName,
ReduceTo: v3.ReduceToOperatorAvg,
GroupBy: []v3.AttributeKey{{
Key: "service_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
GroupBy: []v3.AttributeKey{
{
Key: "service_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
{
Key: "topic",
DataType: v3.AttributeKeyDataTypeString,
@ -118,9 +126,13 @@ func buildBuilderQueriesProducerBytes(unixMilliStart, unixMilliEnd int64, attrib
return bq, nil
}
func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCache *Clients) (map[string]*v3.BuilderQuery, error) {
func buildBuilderQueriesNetwork(
unixMilliStart, unixMilliEnd int64,
attributeCache *Clients,
) (map[string]*v3.BuilderQuery, error) {
bq := make(map[string]*v3.BuilderQuery)
queryName := fmt.Sprintf("latency")
queryName := "latency"
chq := &v3.BuilderQuery{
QueryName: queryName,
@ -167,11 +179,12 @@ func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCac
},
Expression: queryName,
ReduceTo: v3.ReduceToOperatorAvg,
GroupBy: []v3.AttributeKey{{
Key: "service_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
GroupBy: []v3.AttributeKey{
{
Key: "service_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
{
Key: "client_id",
DataType: v3.AttributeKeyDataTypeString,
@ -189,6 +202,7 @@ func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCac
}
func BuildBuilderQueriesKafkaOnboarding(messagingQueue *MessagingQueue) (*v3.QueryRangeParamsV3, error) {
bq := make(map[string]*v3.BuilderQuery)
unixMilliStart := messagingQueue.Start / 1000000
@ -242,7 +256,11 @@ func BuildBuilderQueriesKafkaOnboarding(messagingQueue *MessagingQueue) (*v3.Que
return queryRangeParams, nil
}
func BuildQRParamsWithCache(messagingQueue *MessagingQueue, queryContext string, attributeCache *Clients) (*v3.QueryRangeParamsV3, error) {
func BuildQRParamsWithCache(
messagingQueue *MessagingQueue,
queryContext string,
attributeCache *Clients,
) (*v3.QueryRangeParamsV3, error) {
queueType := KafkaQueue
@ -254,11 +272,9 @@ func BuildQRParamsWithCache(messagingQueue *MessagingQueue, queryContext string,
if queryContext == "throughput" {
chq, err := buildClickHouseQueryNetwork(messagingQueue, queueType)
if err != nil {
return nil, err
}
cq, err = buildCompositeQuery(chq, queryContext)
} else if queryContext == "fetch-latency" {
@ -271,14 +287,15 @@ func BuildQRParamsWithCache(messagingQueue *MessagingQueue, queryContext string,
BuilderQueries: bhq,
PanelType: v3.PanelTypeTable,
}
} else if queryContext == "producer-throughput-overview" {
start := messagingQueue.Start
end := messagingQueue.End
query := generateProducerPartitionThroughputSQL(start, end, queueType)
cq, err = buildCompositeQuery(&v3.ClickHouseQuery{
Query: query,
}, queryContext)
} else if queryContext == "producer-throughput-overview-byte-rate" {
bhq, err := buildBuilderQueriesProducerBytes(unixMilliStart, unixMilliEnd, attributeCache)
if err != nil {
@ -304,22 +321,60 @@ func BuildQRParamsWithCache(messagingQueue *MessagingQueue, queryContext string,
return queryRangeParams, err
}
func BuildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, queryContext string) (*v3.ClickHouseQuery, error) {
func getFilters(variables map[string]string) *QueueFilters {
return &QueueFilters{
ServiceName: parseFilter(variables["service_name"]),
SpanName: parseFilter(variables["span_name"]),
Queue: parseFilter(variables["queue"]),
Destination: parseFilter(variables["destination"]),
Kind: parseFilter(variables["kind"]),
}
}
// parseFilter splits a comma-separated string into a []string.
// Returns an empty slice if the input is blank.
func parseFilter(val string) []string {
if val == "" {
return []string{}
}
// Split on commas, trim whitespace around each part
parts := strings.Split(val, ",")
var out []string
for _, p := range parts {
trimmed := strings.TrimSpace(p)
if trimmed != "" {
out = append(out, trimmed)
}
}
return out
}
func BuildClickHouseQuery(
messagingQueue *MessagingQueue,
queueType string,
queryContext string,
) (*v3.ClickHouseQuery, error) {
start := messagingQueue.Start
end := messagingQueue.End
var topic, partition string
if queryContext == "producer" ||
queryContext == "consumer" ||
queryContext == "consumer_partition_latency" ||
queryContext == "producer-throughput-details" ||
queryContext == "consumer-throughput-details" {
var ok bool
topic, ok = messagingQueue.Variables["topic"]
if !ok {
return nil, fmt.Errorf("invalid type for Topic")
}
if !(queryContext == "consumer-throughput-details" || queryContext == "producer-throughput-details") {
if !(queryContext == "consumer-throughput-details" ||
queryContext == "producer-throughput-details") {
partition, ok = messagingQueue.Variables["partition"]
if !ok {
return nil, fmt.Errorf("invalid type for Partition")
@ -328,39 +383,44 @@ func BuildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, quer
}
var query string
if queryContext == "producer" {
switch queryContext {
case "overview":
query = generateOverviewSQL(start, end, getFilters(messagingQueue.Variables))
case "producer":
query = generateProducerSQL(start, end, topic, partition, queueType)
} else if queryContext == "consumer" {
case "consumer":
consumerGroup, ok := messagingQueue.Variables["consumer_group"]
if !ok {
return nil, fmt.Errorf("invalid type for consumer group")
}
query = generateConsumerSQL(start, end, topic, partition, consumerGroup, queueType)
} else if queryContext == "producer-topic-throughput" {
case "producer-topic-throughput":
query = generatePartitionLatencySQL(start, end, queueType)
} else if queryContext == "consumer_partition_latency" {
case "consumer_partition_latency":
query = generateConsumerPartitionLatencySQL(start, end, topic, partition, queueType)
} else if queryContext == "producer-throughput-details" {
case "producer-throughput-details":
svcName, ok := messagingQueue.Variables["service_name"]
if !ok {
return nil, fmt.Errorf("invalid type for service")
}
query = generateProducerTopicLatencySQL(start, end, topic, svcName, queueType)
} else if queryContext == "consumer-throughput-overview" {
case "consumer-throughput-overview":
query = generateConsumerLatencySQL(start, end, queueType)
} else if queryContext == "consumer-throughput-details" {
case "consumer-throughput-details":
svcName, ok := messagingQueue.Variables["service_name"]
if !ok {
return nil, fmt.Errorf("invalid type for service")
}
query = generateConsumerServiceLatencySQL(start, end, topic, svcName, queueType)
} else if queryContext == "producer-consumer-eval" {
case "producer-consumer-eval":
query = generateProducerConsumerEvalSQL(start, end, queueType, messagingQueue.EvalTime)
} else if queryContext == "onboard_producers" {
case "onboard_producers":
query = onboardProducersSQL(start, end, queueType)
} else if queryContext == "onboard_consumers" {
case "onboard_consumers":
query = onboardConsumerSQL(start, end, queueType)
}
return &v3.ClickHouseQuery{
Query: query,
}, nil