chore: consumer group filtering

This commit is contained in:
shivanshu 2024-07-31 17:55:13 +05:30
parent 063c9adba6
commit 3ff0aa4b4b
No known key found for this signature in database
GPG Key ID: 0F9ACBC3AA12DC71
3 changed files with 32 additions and 83 deletions

View File

@ -19,52 +19,14 @@ POST /api/v1/messaging-queues/kafka/consumer-lag/consumer-details
"end": 1721290096000000000, "end": 1721290096000000000,
"variables": { "variables": {
"partition": "0", "partition": "0",
"topic": "topic1" "topic": "topic1",
"consumer_group": "cg1"
} }
} }
``` ```
response in query range format `series` response in query range format `series`
```json ```json
{
"status": "success",
"data": {
"resultType": "",
"result": [
{
"queryName": "producer",
"series": [
{
"labels": {
"error_rate": "0",
"p99_query.p99": "150.08830908000002",
"rps": "0.00016534391534391533",
"service_name": "producer-svc"
},
"labelsArray": [
{
"service_name": "producer-svc"
},
{
"p99_query.p99": "150.08830908000002"
},
{
"error_rate": "0"
},
{
"rps": "0.00016534391534391533"
}
],
"values": []
}
]
}
]
}
}
```
response in query range format `table`
```json
{ {
"status": "success", "status": "success",
"data": { "data": {
@ -73,11 +35,6 @@ response in query range format `table`
{ {
"table": { "table": {
"columns": [ "columns": [
{
"name": "consumer_group",
"queryName": "",
"isValueColumn": false
},
{ {
"name": "service_name", "name": "service_name",
"queryName": "", "queryName": "",
@ -108,22 +65,11 @@ response in query range format `table`
{ {
"data": { "data": {
"avg_msg_size": "0", "avg_msg_size": "0",
"consumer_group": "cg1",
"error_rate": "0", "error_rate": "0",
"p99": "0.2942205100000016", "p99": "0.2942205100000016",
"service_name": "consumer-svc", "service_name": "consumer-svc",
"throughput": "0.00016534391534391533" "throughput": "0.00016534391534391533"
} }
},
{
"data": {
"avg_msg_size": "0",
"consumer_group": "cg3",
"error_rate": "0",
"p99": "0.216600410000002",
"service_name": "consumer-svc",
"throughput": "0.00016534391534391533"
}
} }
] ]
} }
@ -145,12 +91,12 @@ POST /api/v1/messaging-queues/kafka/consumer-lag/consumer-details
```json ```json
{ {
"start": 1720685296000000000, "start": 1720685296000000000,
"end": 1721290096000000000, "end": 1721290096000000000,
"variables": { "variables": {
"partition": "0", "partition": "0",
"topic": "topic1" "topic": "topic1"
} }
} }
``` ```

View File

@ -4,13 +4,12 @@ import (
"fmt" "fmt"
) )
func generateConsumerSQL(start, end int64, topic, partition, queueType string) string { func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queueType string) string {
query := fmt.Sprintf(` query := fmt.Sprintf(`
WITH WITH
-- Sub query for p99 calculation -- Sub query for p99 calculation
p99_query AS ( p99_query AS (
SELECT SELECT
stringTagMap['messaging.kafka.consumer.group'] as consumer_group,
serviceName, serviceName,
quantile(0.99)(durationNano) / 1000000 as p99 quantile(0.99)(durationNano) / 1000000 as p99
FROM signoz_traces.signoz_index_v2 FROM signoz_traces.signoz_index_v2
@ -21,13 +20,13 @@ p99_query AS (
AND msgSystem = '%s' AND msgSystem = '%s'
AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.name'] = '%s'
AND stringTagMap['messaging.destination.partition.id'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s'
GROUP BY consumer_group, serviceName AND stringTagMap['messaging.kafka.consumer.group'] = '%s'
GROUP BY serviceName
), ),
-- Sub query for RPS calculation -- Sub query for RPS calculation
rps_query AS ( rps_query AS (
SELECT SELECT
stringTagMap['messaging.kafka.consumer.group'] AS consumer_group,
serviceName, serviceName,
count(*) / ((%d - %d) / 1000000000) AS rps -- Convert nanoseconds to seconds count(*) / ((%d - %d) / 1000000000) AS rps -- Convert nanoseconds to seconds
FROM signoz_traces.signoz_index_v2 FROM signoz_traces.signoz_index_v2
@ -38,13 +37,13 @@ rps_query AS (
AND msgSystem = '%s' AND msgSystem = '%s'
AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.name'] = '%s'
AND stringTagMap['messaging.destination.partition.id'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s'
GROUP BY consumer_group, serviceName AND stringTagMap['messaging.kafka.consumer.group'] = '%s'
GROUP BY serviceName
), ),
-- Sub query for error rate calculation -- Sub query for error rate calculation
error_rate_query AS ( error_rate_query AS (
SELECT SELECT
stringTagMap['messaging.kafka.consumer.group'] AS consumer_group,
serviceName, serviceName,
count(*) / ((%d - %d) / 1000000000) AS error_rate -- Convert nanoseconds to seconds count(*) / ((%d - %d) / 1000000000) AS error_rate -- Convert nanoseconds to seconds
FROM signoz_traces.signoz_index_v2 FROM signoz_traces.signoz_index_v2
@ -56,13 +55,13 @@ error_rate_query AS (
AND msgSystem = '%s' AND msgSystem = '%s'
AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.name'] = '%s'
AND stringTagMap['messaging.destination.partition.id'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s'
GROUP BY consumer_group, serviceName AND stringTagMap['messaging.kafka.consumer.group'] = '%s'
GROUP BY serviceName
), ),
-- Sub query for average message size calculation -- Sub query for average message size calculation
avg_msg_size_query AS ( avg_msg_size_query AS (
SELECT SELECT
stringTagMap['messaging.kafka.consumer.group'] AS consumer_group,
serviceName, serviceName,
avg(numberTagMap['messaging.message.body.size']) AS avg_msg_size avg(numberTagMap['messaging.message.body.size']) AS avg_msg_size
FROM signoz_traces.signoz_index_v2 FROM signoz_traces.signoz_index_v2
@ -73,12 +72,12 @@ avg_msg_size_query AS (
AND msgSystem = '%s' AND msgSystem = '%s'
AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.name'] = '%s'
AND stringTagMap['messaging.destination.partition.id'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s'
GROUP BY consumer_group, serviceName AND stringTagMap['messaging.kafka.consumer.group'] = '%s'
GROUP BY serviceName
) )
-- Main query to combine all metrics -- Main query to combine all metrics
SELECT SELECT
p99_query.consumer_group AS consumer_group,
p99_query.serviceName AS service_name, p99_query.serviceName AS service_name,
p99_query.p99 AS p99, p99_query.p99 AS p99,
COALESCE(error_rate_query.error_rate, 0) AS error_rate, COALESCE(error_rate_query.error_rate, 0) AS error_rate,
@ -86,15 +85,14 @@ SELECT
COALESCE(avg_msg_size_query.avg_msg_size, 0) AS avg_msg_size COALESCE(avg_msg_size_query.avg_msg_size, 0) AS avg_msg_size
FROM FROM
p99_query p99_query
LEFT JOIN rps_query ON p99_query.consumer_group = rps_query.consumer_group LEFT JOIN rps_query ON p99_query.serviceName = rps_query.serviceName
AND p99_query.serviceName = rps_query.serviceName LEFT JOIN error_rate_query ON p99_query.serviceName = error_rate_query.serviceName
LEFT JOIN error_rate_query ON p99_query.consumer_group = error_rate_query.consumer_group LEFT JOIN avg_msg_size_query ON p99_query.serviceName = avg_msg_size_query.serviceName
AND p99_query.serviceName = error_rate_query.serviceName
LEFT JOIN avg_msg_size_query ON p99_query.consumer_group = avg_msg_size_query.consumer_group
AND p99_query.serviceName = avg_msg_size_query.serviceName
ORDER BY ORDER BY
p99_query.consumer_group; p99_query.serviceName;
`, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, partition, end, start, queueType, topic, partition) `, start, end, queueType, topic, partition, consumerGroup, end, start, start, end, queueType, topic,
partition, consumerGroup, end, start, start, end, queueType, topic, partition,
consumerGroup, end, start, queueType, topic, partition, consumerGroup)
return query return query
} }
@ -167,6 +165,7 @@ FROM
ORDER BY ORDER BY
p99_query.serviceName; p99_query.serviceName;
`, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, partition) `, start, end, queueType, topic, partition, end, start, start, end, queueType, topic,
partition, end, start, start, end, queueType, topic, partition)
return query return query
} }

View File

@ -44,16 +44,20 @@ func buildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, quer
} }
partition, ok := messagingQueue.Variables["partition"] partition, ok := messagingQueue.Variables["partition"]
if !ok { if !ok {
return nil, fmt.Errorf("invalid type for Partition") return nil, fmt.Errorf("invalid type for Partition")
} }
consumerGroup, ok := messagingQueue.Variables["consumer_group"]
if !ok {
return nil, fmt.Errorf("invalid type for consumer group")
}
var query string var query string
if queryContext == "producer" { if queryContext == "producer" {
query = generateProducerSQL(start, end, topic, partition, queueType) query = generateProducerSQL(start, end, topic, partition, queueType)
} else if queryContext == "consumer" { } else if queryContext == "consumer" {
query = generateConsumerSQL(start, end, topic, partition, queueType) query = generateConsumerSQL(start, end, topic, partition, consumerGroup, queueType)
} }
return &v3.ClickHouseQuery{ return &v3.ClickHouseQuery{