chore: addressing review comments

This commit is contained in:
shivanshu 2024-08-05 18:14:40 +05:30
parent 3ff0aa4b4b
commit c957c0f757
No known key found for this signature in database
GPG Key ID: 0F9ACBC3AA12DC71
3 changed files with 37 additions and 133 deletions

View File

@ -2252,7 +2252,6 @@ func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *Auth
// SubRouter for kafka // SubRouter for kafka
kafkaSubRouter := router.PathPrefix("/api/v1/messaging-queues/kafka/consumer-lag").Subrouter() kafkaSubRouter := router.PathPrefix("/api/v1/messaging-queues/kafka/consumer-lag").Subrouter()
//kafkaSubRouter.HandleFunc("/consumer-lag", am.ViewAccess(aH.QueryRangeV4)).Methods(http.MethodPost)
kafkaSubRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost) kafkaSubRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost)
kafkaSubRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost) kafkaSubRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost)

View File

@ -86,7 +86,7 @@ response in query range format `series`
API endpoint: API endpoint:
``` ```
POST /api/v1/messaging-queues/kafka/consumer-lag/consumer-details POST /api/v1/messaging-queues/kafka/consumer-lag/producer-details
``` ```
```json ```json

View File

@ -5,167 +5,72 @@ import (
) )
func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queueType string) string { func generateConsumerSQL(start, end int64, topic, partition, consumerGroup, queueType string) string {
timeRange := (end - start) / 1000000000
query := fmt.Sprintf(` query := fmt.Sprintf(`
WITH WITH consumer_query AS (
-- Sub query for p99 calculation
p99_query AS (
SELECT SELECT
serviceName, serviceName,
quantile(0.99)(durationNano) / 1000000 as p99 quantile(0.99)(durationNano) / 1000000 AS p99,
FROM signoz_traces.signoz_index_v2 COUNT(*) AS total_requests,
WHERE SUM(CASE WHEN statusCode = 2 THEN 1 ELSE 0 END) AS error_count,
timestamp >= '%d' avg(CASE WHEN has(numberTagMap, 'messaging.message.body.size') THEN numberTagMap['messaging.message.body.size'] ELSE NULL END) AS avg_msg_size
AND timestamp <= '%d' FROM signoz_traces.distributed_signoz_index_v2
AND kind = 5
AND msgSystem = '%s'
AND stringTagMap['messaging.destination.name'] = '%s'
AND stringTagMap['messaging.destination.partition.id'] = '%s'
AND stringTagMap['messaging.kafka.consumer.group'] = '%s'
GROUP BY serviceName
),
-- Sub query for RPS calculation
rps_query AS (
SELECT
serviceName,
count(*) / ((%d - %d) / 1000000000) AS rps -- Convert nanoseconds to seconds
FROM signoz_traces.signoz_index_v2
WHERE WHERE
timestamp >= '%d' timestamp >= '%d'
AND timestamp <= '%d' AND timestamp <= '%d'
AND kind = 5 AND kind = 5
AND msgSystem = '%s' AND msgSystem = '%s'
AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.name'] = '%s'
AND stringTagMap['messaging.destination.partition.id'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s'
AND stringTagMap['messaging.kafka.consumer.group'] = '%s' AND stringTagMap['messaging.kafka.consumer.group'] = '%s'
GROUP BY serviceName
),
-- Sub query for error rate calculation
error_rate_query AS (
SELECT
serviceName,
count(*) / ((%d - %d) / 1000000000) AS error_rate -- Convert nanoseconds to seconds
FROM signoz_traces.signoz_index_v2
WHERE
timestamp >= '%d'
AND timestamp <= '%d'
AND statusCode = 2
AND kind = 5
AND msgSystem = '%s'
AND stringTagMap['messaging.destination.name'] = '%s'
AND stringTagMap['messaging.destination.partition.id'] = '%s'
AND stringTagMap['messaging.kafka.consumer.group'] = '%s'
GROUP BY serviceName
),
-- Sub query for average message size calculation
avg_msg_size_query AS (
SELECT
serviceName,
avg(numberTagMap['messaging.message.body.size']) AS avg_msg_size
FROM signoz_traces.signoz_index_v2
WHERE
timestamp >= '%d'
AND timestamp <= '%d'
AND kind = 5
AND msgSystem = '%s'
AND stringTagMap['messaging.destination.name'] = '%s'
AND stringTagMap['messaging.destination.partition.id'] = '%s'
AND stringTagMap['messaging.kafka.consumer.group'] = '%s'
GROUP BY serviceName GROUP BY serviceName
) )
-- Main query to combine all metrics -- Main query to select all metrics
SELECT SELECT
p99_query.serviceName AS service_name, serviceName AS service_name,
p99_query.p99 AS p99, p99,
COALESCE(error_rate_query.error_rate, 0) AS error_rate, COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate,
COALESCE(rps_query.rps, 0) AS throughput, COALESCE(total_requests / %d, 0) AS throughput, -- Convert nanoseconds to seconds
COALESCE(avg_msg_size_query.avg_msg_size, 0) AS avg_msg_size COALESCE(avg_msg_size, 0) AS avg_msg_size
FROM FROM
p99_query consumer_query
LEFT JOIN rps_query ON p99_query.serviceName = rps_query.serviceName
LEFT JOIN error_rate_query ON p99_query.serviceName = error_rate_query.serviceName
LEFT JOIN avg_msg_size_query ON p99_query.serviceName = avg_msg_size_query.serviceName
ORDER BY ORDER BY
p99_query.serviceName; serviceName;
`, start, end, queueType, topic, partition, consumerGroup, end, start, start, end, queueType, topic, `, start, end, queueType, topic, partition, consumerGroup, timeRange)
partition, consumerGroup, end, start, start, end, queueType, topic, partition,
consumerGroup, end, start, queueType, topic, partition, consumerGroup)
return query return query
} }
func generateProducerSQL(start, end int64, topic, partition, queueType string) string { func generateProducerSQL(start, end int64, topic, partition, queueType string) string {
timeRange := (end - start) / 1000000000
query := fmt.Sprintf(` query := fmt.Sprintf(`
WITH producer_query AS (
-- producer
WITH
-- Subquery for p99 calculation
p99_query AS (
SELECT SELECT
serviceName, serviceName,
quantile(0.99)(durationNano) / 1000000 as p99 quantile(0.99)(durationNano) / 1000000 AS p99,
FROM signoz_traces.signoz_index_v2 count(*) AS total_count,
SUM(CASE WHEN statusCode = 2 THEN 1 ELSE 0 END) AS error_count
FROM signoz_traces.distributed_signoz_index_v2
WHERE WHERE
timestamp >= '%d' timestamp >= '%d'
AND timestamp <= '%d' AND timestamp <= '%d'
AND kind = 4 AND kind = 4
AND msgSystem = '%s' AND msgSystem = '%s'
AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.name'] = '%s'
AND stringTagMap['messaging.destination.partition.id'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s'
GROUP BY serviceName
),
-- Subquery for RPS calculation
rps_query AS (
SELECT
serviceName,
count(*) / ((%d - %d) / 1000000000) as rps -- Convert nanoseconds to seconds
FROM signoz_traces.signoz_index_v2
WHERE
timestamp >= '%d'
AND timestamp <= '%d'
AND kind = 4
AND msgSystem = '%s'
AND stringTagMap['messaging.destination.name'] = '%s'
AND stringTagMap['messaging.destination.partition.id'] = '%s'
GROUP BY serviceName
),
-- Subquery for error rate calculation
error_rate_query AS (
SELECT
serviceName,
count(*) / ((%d - %d) / 1000000000) as error_rate -- Convert nanoseconds to seconds
FROM signoz_traces.signoz_index_v2
WHERE
timestamp >= '%d'
AND timestamp <= '%d'
AND statusCode = 2
AND kind = 4
AND msgSystem = '%s'
AND stringTagMap['messaging.destination.name'] = '%s'
AND stringTagMap['messaging.destination.partition.id'] = '%s'
GROUP BY serviceName GROUP BY serviceName
) )
-- Main query to combine all metrics
SELECT SELECT
p99_query.serviceName AS service_name, serviceName AS service_name,
p99_query.p99, p99,
COALESCE(error_rate_query.error_rate, 0) AS error_rate, COALESCE((error_count * 100.0) / total_count, 0) AS error_percentage,
COALESCE(rps_query.rps, 0) AS rps COALESCE(total_count / %d, 0) AS rps -- Convert nanoseconds to seconds
FROM FROM
p99_query producer_query
LEFT JOIN
rps_query ON p99_query.serviceName = rps_query.serviceName
LEFT JOIN
error_rate_query ON p99_query.serviceName = error_rate_query.serviceName
ORDER BY ORDER BY
p99_query.serviceName; serviceName;
`, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, `, start, end, queueType, topic, partition, timeRange)
partition, end, start, start, end, queueType, topic, partition)
return query return query
} }