diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go index 61a504eaee..b24734cf48 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/model.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/model.go @@ -1,7 +1,9 @@ package kafka +const kafkaQueue = "kafka" + type MessagingQueue struct { - Start int64 `json:"start"` - End int64 `json:"end"` + Start int64 `json:"start"` + End int64 `json:"end"` Variables map[string]string `json:"variables,omitempty"` } diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index b22cc3f9aa..552fdc5d7a 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -4,7 +4,7 @@ import ( "fmt" ) -func generateConsumerSQL(start, end int64, topic, partition string) string { +func generateConsumerSQL(start, end int64, topic, partition, queueType string) string { query := fmt.Sprintf(` WITH -- Sub query for p99 calculation @@ -18,6 +18,7 @@ p99_query AS ( timestamp >= '%d' AND timestamp <= '%d' AND kind = 5 + AND msgSystem = '%s' AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s' GROUP BY consumer_group, serviceName @@ -34,6 +35,7 @@ rps_query AS ( timestamp >= '%d' AND timestamp <= '%d' AND kind = 5 + AND msgSystem = '%s' AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s' GROUP BY consumer_group, serviceName @@ -51,6 +53,7 @@ error_rate_query AS ( AND timestamp <= '%d' AND statusCode = 2 AND kind = 5 + AND msgSystem = '%s' AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s' GROUP BY consumer_group, serviceName @@ -67,6 +70,7 @@ avg_msg_size_query AS ( timestamp >= '%d' AND timestamp <= '%d' AND kind = 5 + AND msgSystem = '%s' AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s' GROUP BY consumer_group, serviceName @@ -90,11 +94,11 @@ FROM AND p99_query.serviceName = avg_msg_size_query.serviceName ORDER BY p99_query.consumer_group; -`, start, end, topic, partition, end, start, start, end, topic, partition, end, start, start, end, topic, partition, end, start, topic, partition) +`, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, partition, end, start, queueType, topic, partition) return query } -func generateProducerSQL(start, end int64, topic, partition string) string { +func generateProducerSQL(start, end int64, topic, partition, queueType string) string { query := fmt.Sprintf(` -- producer @@ -109,6 +113,7 @@ p99_query AS ( timestamp >= '%d' AND timestamp <= '%d' AND kind = 4 + AND msgSystem = '%s' AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s' GROUP BY serviceName @@ -124,6 +129,7 @@ rps_query AS ( timestamp >= '%d' AND timestamp <= '%d' AND kind = 4 + AND msgSystem = '%s' AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s' GROUP BY serviceName @@ -140,6 +146,7 @@ error_rate_query AS ( AND timestamp <= '%d' AND statusCode = 2 AND kind = 4 + AND msgSystem = '%s' AND stringTagMap['messaging.destination.name'] = '%s' AND stringTagMap['messaging.destination.partition.id'] = '%s' GROUP BY serviceName @@ -160,6 +167,6 @@ FROM ORDER BY p99_query.serviceName; -`, start, end, topic, partition, end, start, start, end, topic, partition, end, start, start, end, topic, partition) +`, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, partition, end, start, start, end, queueType, topic, partition) return query } diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index c8e4abf9fa..5cb67d7bf0 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -10,9 +10,12 @@ var defaultStepInterval int64 = 60 func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) (*v3.QueryRangeParamsV3, error) { + // ToDo: propagate this through APIs when there are different handlers + queueType := kafkaQueue + var cq *v3.CompositeQuery if queryContext == "producer" { - chq, err := buildProducerClickHouseQuery(messagingQueue) + chq, err := buildProducerClickHouseQuery(messagingQueue, queueType) if err != nil { return nil, err } @@ -22,7 +25,7 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) return nil, err } } else if queryContext == "consumer" { - chq, err := buildConsumerClickHouseQuery(messagingQueue) + chq, err := buildConsumerClickHouseQuery(messagingQueue, queueType) if err != nil { return nil, err } @@ -44,7 +47,7 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) return queryRangeParams, nil } -func buildProducerClickHouseQuery(messagingQueue *MessagingQueue) (*v3.ClickHouseQuery, error) { +func buildProducerClickHouseQuery(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) { start := messagingQueue.Start end := messagingQueue.End topic, ok := messagingQueue.Variables["topic"] @@ -57,14 +60,14 @@ func buildProducerClickHouseQuery(messagingQueue *MessagingQueue) (*v3.ClickHous if !ok { return nil, fmt.Errorf("invalid type for Partition") } - query := generateProducerSQL(start, end, topic, partition) + query := generateProducerSQL(start, end, topic, partition, queueType) return &v3.ClickHouseQuery{ Query: query, }, nil } -func buildConsumerClickHouseQuery(messagingQueue *MessagingQueue) (*v3.ClickHouseQuery, error) { +func buildConsumerClickHouseQuery(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) { start := messagingQueue.Start end := messagingQueue.End topic, ok := messagingQueue.Variables["topic"] @@ -77,7 +80,7 @@ func buildConsumerClickHouseQuery(messagingQueue *MessagingQueue) (*v3.ClickHous if !ok { return nil, fmt.Errorf("invalid type for Partition") } - query := generateConsumerSQL(start, end, topic, partition) + query := generateConsumerSQL(start, end, topic, partition, queueType) return &v3.ClickHouseQuery{ Query: query,