diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go index eb06689ef3..fd50ac4e6f 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql.go @@ -95,3 +95,62 @@ ORDER BY throughput DESC `, timeRange, start, end, queueType, consumerGroup, partitionID) return query } + +func onboardProducersSQL(start, end int64, queueType string) string { + query := fmt.Sprintf(` +SELECT + CASE + WHEN COUNT(*) = 0 THEN 1 + WHEN SUM(msgSystem = '%s') = 0 THEN 2 + WHEN SUM(kind = 4) = 0 THEN 3 + WHEN SUM(has(stringTagMap, 'messaging.destination.name')) = 0 THEN 4 + WHEN SUM(has(stringTagMap, 'messaging.destination.partition.id')) = 0 THEN 5 + ELSE 0 + END AS result_code +FROM signoz_traces.distributed_signoz_index_v2 +WHERE + timestamp >= '%d' + AND timestamp <= '%d';`, queueType, start, end) + return query +} + +func onboardConsumerSQL(start, end int64, queueType string) string { + query := fmt.Sprintf(` +SELECT + CASE + WHEN COUNT(*) = 0 THEN 1 + WHEN SUM(msgSystem = '%s') = 0 THEN 2 + WHEN SUM(kind = 5) = 0 THEN 3 + WHEN SUM(serviceName IS NOT NULL) = 0 THEN 4 + WHEN SUM(has(stringTagMap, 'messaging.destination.name')) = 0 THEN 5 + WHEN SUM(has(stringTagMap, 'messaging.destination.partition.id')) = 0 THEN 6 + WHEN SUM(has(stringTagMap, 'messaging.kafka.consumer.group')) = 0 THEN 7 + WHEN SUM(has(numberTagMap, 'messaging.message.body.size')) = 0 THEN 8 + WHEN SUM(has(stringTagMap, 'messaging.client_id')) = 0 THEN 9 + WHEN SUM(has(stringTagMap, 'service.instance.id')) = 0 THEN 10 + ELSE 0 + END AS result_code +FROM signoz_traces.distributed_signoz_index_v2 +WHERE + timestamp >= '%d' + AND timestamp <= '%d';`, queueType, start, end) + return query +} + +func onboardKafkaSQL(start, end int64) string { + query := fmt.Sprintf(` +SELECT + CASE + WHEN COUNT(CASE WHEN metric_name = 'kafka_consumer_fetch_latency_avg' THEN 1 END) = 0 + AND COUNT(CASE WHEN metric_name = 'kafka_consumer_group_lag' THEN 1 END) = 0 THEN 1 + WHEN COUNT(CASE WHEN metric_name = 'kafka_consumer_fetch_latency_avg' THEN 1 END) = 0 THEN 2 + WHEN COUNT(CASE WHEN metric_name = 'kafka_consumer_group_lag' THEN 1 END) = 0 THEN 3 + ELSE 0 + END AS result_code +FROM signoz_metrics.time_series_v4_1day +WHERE + metric_name IN ('kafka_consumer_fetch_latency_avg', 'kafka_consumer_group_lag') + AND unix_milli >= '%d' + AND unix_milli < '%d';`, start, end) + return query +}