chore: add queue type

This commit is contained in:
shivanshu 2024-07-26 13:02:45 +05:30
parent 481bb6e8b8
commit 5c3ce146fa
No known key found for this signature in database
GPG Key ID: 0F9ACBC3AA12DC71
3 changed files with 24 additions and 12 deletions

View File

@ -1,7 +1,9 @@
package kafka
const kafkaQueue = "kafka"
type MessagingQueue struct {
Start int64 `json:"start"`
End int64 `json:"end"`
Start int64 `json:"start"`
End int64 `json:"end"`
Variables map[string]string `json:"variables,omitempty"`
}

View File

@ -4,7 +4,7 @@ import (
"fmt"
)
func generateConsumerSQL(start, end int64, topic, partition string) string {
func generateConsumerSQL(start, end int64, topic, partition, queueType string) string {
query := fmt.Sprintf(`
WITH
-- Sub query for p99 calculation
@ -18,6 +18,7 @@ p99_query AS (
timestamp >= '%d'
AND timestamp <= '%d'
AND kind = 5
AND msgSystem = '%s'
AND stringTagMap['messaging.destination.name'] = '%s'
AND stringTagMap['messaging.destination.partition.id'] = '%s'
GROUP BY consumer_group, serviceName
@ -34,6 +35,7 @@ rps_query AS (
timestamp >= '%d'
AND timestamp <= '%d'
AND kind = 5
AND msgSystem = '%s'
AND stringTagMap['messaging.destination.name'] = '%s'
AND stringTagMap['messaging.destination.partition.id'] = '%s'
GROUP BY consumer_group, serviceName
@ -51,6 +53,7 @@ error_rate_query AS (
AND timestamp <= '%d'
AND statusCode = 2
AND kind = 5
AND msgSystem = '%s'
AND stringTagMap['messaging.destination.name'] = '%s'
AND stringTagMap['messaging.destination.partition.id'] = '%s'
GROUP BY consumer_group, serviceName
@ -67,6 +70,7 @@ avg_msg_size_query AS (
timestamp >= '%d'
AND timestamp <= '%d'
AND kind = 5
AND msgSystem = '%s'
AND stringTagMap['messaging.destination.name'] = '%s'
AND stringTagMap['messaging.destination.partition.id'] = '%s'
GROUP BY consumer_group, serviceName
@ -90,11 +94,11 @@ FROM
AND p99_query.serviceName = avg_msg_size_query.serviceName
ORDER BY
p99_query.consumer_group;
`, start, end, topic, partition, end, start, start, end, topic, partition, end, start, start, end, topic, partition, end, start, topic, partition)
`, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, partition, end, start, queueType, topic, partition)
return query
}
func generateProducerSQL(start, end int64, topic, partition string) string {
func generateProducerSQL(start, end int64, topic, partition, queueType string) string {
query := fmt.Sprintf(`
-- producer
@ -109,6 +113,7 @@ p99_query AS (
timestamp >= '%d'
AND timestamp <= '%d'
AND kind = 4
AND msgSystem = '%s'
AND stringTagMap['messaging.destination.name'] = '%s'
AND stringTagMap['messaging.destination.partition.id'] = '%s'
GROUP BY serviceName
@ -124,6 +129,7 @@ rps_query AS (
timestamp >= '%d'
AND timestamp <= '%d'
AND kind = 4
AND msgSystem = '%s'
AND stringTagMap['messaging.destination.name'] = '%s'
AND stringTagMap['messaging.destination.partition.id'] = '%s'
GROUP BY serviceName
@ -140,6 +146,7 @@ error_rate_query AS (
AND timestamp <= '%d'
AND statusCode = 2
AND kind = 4
AND msgSystem = '%s'
AND stringTagMap['messaging.destination.name'] = '%s'
AND stringTagMap['messaging.destination.partition.id'] = '%s'
GROUP BY serviceName
@ -160,6 +167,6 @@ FROM
ORDER BY
p99_query.serviceName;
`, start, end, topic, partition, end, start, start, end, topic, partition, end, start, start, end, topic, partition)
`, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, partition)
return query
}

View File

@ -10,9 +10,12 @@ var defaultStepInterval int64 = 60
func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) (*v3.QueryRangeParamsV3, error) {
// ToDo: propagate this through APIs when there are different handlers
queueType := kafkaQueue
var cq *v3.CompositeQuery
if queryContext == "producer" {
chq, err := buildProducerClickHouseQuery(messagingQueue)
chq, err := buildProducerClickHouseQuery(messagingQueue, queueType)
if err != nil {
return nil, err
}
@ -22,7 +25,7 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string)
return nil, err
}
} else if queryContext == "consumer" {
chq, err := buildConsumerClickHouseQuery(messagingQueue)
chq, err := buildConsumerClickHouseQuery(messagingQueue, queueType)
if err != nil {
return nil, err
}
@ -44,7 +47,7 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string)
return queryRangeParams, nil
}
func buildProducerClickHouseQuery(messagingQueue *MessagingQueue) (*v3.ClickHouseQuery, error) {
func buildProducerClickHouseQuery(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) {
start := messagingQueue.Start
end := messagingQueue.End
topic, ok := messagingQueue.Variables["topic"]
@ -57,14 +60,14 @@ func buildProducerClickHouseQuery(messagingQueue *MessagingQueue) (*v3.ClickHous
if !ok {
return nil, fmt.Errorf("invalid type for Partition")
}
query := generateProducerSQL(start, end, topic, partition)
query := generateProducerSQL(start, end, topic, partition, queueType)
return &v3.ClickHouseQuery{
Query: query,
}, nil
}
func buildConsumerClickHouseQuery(messagingQueue *MessagingQueue) (*v3.ClickHouseQuery, error) {
func buildConsumerClickHouseQuery(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) {
start := messagingQueue.Start
end := messagingQueue.End
topic, ok := messagingQueue.Variables["topic"]
@ -77,7 +80,7 @@ func buildConsumerClickHouseQuery(messagingQueue *MessagingQueue) (*v3.ClickHous
if !ok {
return nil, fmt.Errorf("invalid type for Partition")
}
query := generateConsumerSQL(start, end, topic, partition)
query := generateConsumerSQL(start, end, topic, partition, queueType)
return &v3.ClickHouseQuery{
Query: query,