From 3ff0aa4b4bc271d5fa83d433e8d7afadbe433a1a Mon Sep 17 00:00:00 2001 From: shivanshu Date: Wed, 31 Jul 2024 17:55:13 +0530 Subject: [PATCH] chore: consumer group filtering --- .../messagingQueues/kafka/consumerLag.md | 70 +++---------------- .../integrations/messagingQueues/kafka/sql.go | 37 +++++----- .../messagingQueues/kafka/translator.go | 8 ++- 3 files changed, 32 insertions(+), 83 deletions(-) diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md index c3691ecc36..5e12a87e76 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md @@ -19,52 +19,14 @@ POST /api/v1/messaging-queues/kafka/consumer-lag/consumer-details "end": 1721290096000000000, "variables": { "partition": "0", - "topic": "topic1" + "topic": "topic1", + "consumer_group": "cg1" } } ``` response in query range format `series` ```json -{ - "status": "success", - "data": { - "resultType": "", - "result": [ - { - "queryName": "producer", - "series": [ - { - "labels": { - "error_rate": "0", - "p99_query.p99": "150.08830908000002", - "rps": "0.00016534391534391533", - "service_name": "producer-svc" - }, - "labelsArray": [ - { - "service_name": "producer-svc" - }, - { - "p99_query.p99": "150.08830908000002" - }, - { - "error_rate": "0" - }, - { - "rps": "0.00016534391534391533" - } - ], - "values": [] - } - ] - } - ] - } -} -``` -response in query range format `table` -```json { "status": "success", "data": { @@ -73,11 +35,6 @@ response in query range format `table` { "table": { "columns": [ - { - "name": "consumer_group", - "queryName": "", - "isValueColumn": false - }, { "name": "service_name", "queryName": "", @@ -108,22 +65,11 @@ response in query range format `table` { "data": { "avg_msg_size": "0", - "consumer_group": "cg1", "error_rate": "0", "p99": "0.2942205100000016", "service_name": "consumer-svc", "throughput": "0.00016534391534391533" } - }, - { - "data": { - "avg_msg_size": "0", - "consumer_group": "cg3", - "error_rate": "0", - "p99": "0.216600410000002", - "service_name": "consumer-svc", - "throughput": "0.00016534391534391533" - } } ] } @@ -145,12 +91,12 @@ POST /api/v1/messaging-queues/kafka/consumer-lag/consumer-details ```json { - "start": 1720685296000000000, - "end": 1721290096000000000, - "variables": { - "partition": "0", - "topic": "topic1" - } + "start": 1720685296000000000, + "end": 1721290096000000000, + "variables": { + "partition": "0", + "topic": "topic1" + } } ``` diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index 552fdc5d7a..f479ea5ac9 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -4,13 +4,12 @@ import ( "fmt" ) -func generateConsumerSQL(start, end int64, topic, partition, queueType string) string { +func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queueType string) string { query := fmt.Sprintf(` WITH -- Sub query for p99 calculation p99_query AS ( SELECT - stringTagMap['messaging.kafka.consumer.group'] as consumer_group, serviceName, quantile(0.99)(durationNano) / 1000000 as p99 FROM signoz_traces.signoz_index_v2 @@ -21,13 +20,13 @@ p99_query AS ( AND msgSystem = '%s' AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s' - GROUP BY consumer_group, serviceName + AND stringTagMap['messaging.kafka.consumer.group'] = '%s' + GROUP BY serviceName ), -- Sub query for RPS calculation rps_query AS ( SELECT - stringTagMap['messaging.kafka.consumer.group'] AS consumer_group, serviceName, count(*) / ((%d - %d) / 1000000000) AS rps -- Convert nanoseconds to seconds FROM signoz_traces.signoz_index_v2 @@ -38,13 +37,13 @@ rps_query AS ( AND msgSystem = '%s' AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s' - GROUP BY consumer_group, serviceName + AND stringTagMap['messaging.kafka.consumer.group'] = '%s' + GROUP BY serviceName ), -- Sub query for error rate calculation error_rate_query AS ( SELECT - stringTagMap['messaging.kafka.consumer.group'] AS consumer_group, serviceName, count(*) / ((%d - %d) / 1000000000) AS error_rate -- Convert nanoseconds to seconds FROM signoz_traces.signoz_index_v2 @@ -56,13 +55,13 @@ error_rate_query AS ( AND msgSystem = '%s' AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s' - GROUP BY consumer_group, serviceName + AND stringTagMap['messaging.kafka.consumer.group'] = '%s' + GROUP BY serviceName ), -- Sub query for average message size calculation avg_msg_size_query AS ( SELECT - stringTagMap['messaging.kafka.consumer.group'] AS consumer_group, serviceName, avg(numberTagMap['messaging.message.body.size']) AS avg_msg_size FROM signoz_traces.signoz_index_v2 @@ -73,12 +72,12 @@ avg_msg_size_query AS ( AND msgSystem = '%s' AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s' - GROUP BY consumer_group, serviceName + AND stringTagMap['messaging.kafka.consumer.group'] = '%s' + GROUP BY serviceName ) -- Main query to combine all metrics SELECT - p99_query.consumer_group AS consumer_group, p99_query.serviceName AS service_name, p99_query.p99 AS p99, COALESCE(error_rate_query.error_rate, 0) AS error_rate, @@ -86,15 +85,14 @@ SELECT COALESCE(avg_msg_size_query.avg_msg_size, 0) AS avg_msg_size FROM p99_query - LEFT JOIN rps_query ON p99_query.consumer_group = rps_query.consumer_group - AND p99_query.serviceName = rps_query.serviceName - LEFT JOIN error_rate_query ON p99_query.consumer_group = error_rate_query.consumer_group - AND p99_query.serviceName = error_rate_query.serviceName - LEFT JOIN avg_msg_size_query ON p99_query.consumer_group = avg_msg_size_query.consumer_group - AND p99_query.serviceName = avg_msg_size_query.serviceName + LEFT JOIN rps_query ON p99_query.serviceName = rps_query.serviceName + LEFT JOIN error_rate_query ON p99_query.serviceName = error_rate_query.serviceName + LEFT JOIN avg_msg_size_query ON p99_query.serviceName = avg_msg_size_query.serviceName ORDER BY - p99_query.consumer_group; -`, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, partition, end, start, queueType, topic, partition) + p99_query.serviceName; +`, start, end, queueType, topic, partition, consumerGroup, end, start, start, end, queueType, topic, + partition, consumerGroup, end, start, start, end, queueType, topic, partition, + consumerGroup, end, start, queueType, topic, partition, consumerGroup) return query } @@ -167,6 +165,7 @@ FROM ORDER BY p99_query.serviceName; -`, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, partition) +`, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, + partition, end, start, start, end, queueType, topic, partition) return query } diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index 99760d7fbb..98414ebf0f 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -44,16 +44,20 @@ func buildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, quer } partition, ok := messagingQueue.Variables["partition"] - if !ok { return nil, fmt.Errorf("invalid type for Partition") } + consumerGroup, ok := messagingQueue.Variables["consumer_group"] + if !ok { + return nil, fmt.Errorf("invalid type for consumer group") + } + var query string if queryContext == "producer" { query = generateProducerSQL(start, end, topic, partition, queueType) } else if queryContext == "consumer" { - query = generateConsumerSQL(start, end, topic, partition, queueType) + query = generateConsumerSQL(start, end, topic, partition, consumerGroup, queueType) } return &v3.ClickHouseQuery{