mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-07-24 23:44:29 +08:00
Merge branch 'develop' into SIG-5729
This commit is contained in:
commit
a60674cf1b
@ -11,6 +11,7 @@ import (
|
||||
"net/http"
|
||||
"regexp"
|
||||
"slices"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@ -2463,12 +2464,19 @@ func (aH *APIHandler) WriteJSON(w http.ResponseWriter, r *http.Request, response
|
||||
|
||||
// RegisterMessagingQueuesRoutes adds messaging-queues routes
|
||||
func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *AuthMiddleware) {
|
||||
// SubRouter for kafka
|
||||
kafkaSubRouter := router.PathPrefix("/api/v1/messaging-queues/kafka/consumer-lag").Subrouter()
|
||||
|
||||
kafkaSubRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost)
|
||||
kafkaSubRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost)
|
||||
kafkaSubRouter.HandleFunc("/network-latency", am.ViewAccess(aH.getNetworkData)).Methods(http.MethodPost)
|
||||
// SubRouter for kafka
|
||||
kafkaRouter := router.PathPrefix("/api/v1/messaging-queues/kafka").Subrouter()
|
||||
|
||||
consumerLagRouter := kafkaRouter.PathPrefix("/consumer-lag").Subrouter()
|
||||
consumerLagRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost)
|
||||
consumerLagRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost)
|
||||
consumerLagRouter.HandleFunc("/network-latency", am.ViewAccess(aH.getNetworkData)).Methods(http.MethodPost)
|
||||
|
||||
onboardingRouter := kafkaRouter.PathPrefix("/onboarding").Subrouter()
|
||||
onboardingRouter.HandleFunc("/producers", am.ViewAccess(aH.onboardProducers)).Methods(http.MethodPost)
|
||||
onboardingRouter.HandleFunc("/consumers", am.ViewAccess(aH.onboardConsumers)).Methods(http.MethodPost)
|
||||
onboardingRouter.HandleFunc("/kafka", am.ViewAccess(aH.onboardKafka)).Methods(http.MethodPost)
|
||||
|
||||
// for other messaging queues, add SubRouters here
|
||||
}
|
||||
@ -2478,6 +2486,332 @@ func uniqueIdentifier(clientID, serviceInstanceID, serviceName, separator string
|
||||
return clientID + separator + serviceInstanceID + separator + serviceName
|
||||
}
|
||||
|
||||
func (aH *APIHandler) onboardProducers(
|
||||
|
||||
w http.ResponseWriter, r *http.Request,
|
||||
|
||||
) {
|
||||
messagingQueue, apiErr := ParseMessagingQueueBody(r)
|
||||
if apiErr != nil {
|
||||
zap.L().Error(apiErr.Err.Error())
|
||||
RespondError(w, apiErr, nil)
|
||||
return
|
||||
}
|
||||
|
||||
chq, err := mq.BuildClickHouseQuery(messagingQueue, mq.KafkaQueue, "onboard_producers")
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error(err.Error())
|
||||
RespondError(w, apiErr, nil)
|
||||
return
|
||||
}
|
||||
|
||||
results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
|
||||
|
||||
if err != nil {
|
||||
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
|
||||
RespondError(w, apiErrObj, err)
|
||||
return
|
||||
}
|
||||
|
||||
var entries []mq.OnboardingResponse
|
||||
|
||||
for _, result := range results {
|
||||
|
||||
for key, value := range result.Data {
|
||||
var message, attribute, status string
|
||||
|
||||
intValue := int(*value.(*uint8))
|
||||
|
||||
if key == "entries" {
|
||||
attribute = "telemetry ingestion"
|
||||
if intValue != 0 {
|
||||
entries = nil
|
||||
entry := mq.OnboardingResponse{
|
||||
Attribute: attribute,
|
||||
Message: "No data available in the given time range",
|
||||
Status: "0",
|
||||
}
|
||||
entries = append(entries, entry)
|
||||
break
|
||||
} else {
|
||||
status = "1"
|
||||
}
|
||||
} else if key == "queue" {
|
||||
attribute = "messaging.system"
|
||||
if intValue != 0 {
|
||||
status = "0"
|
||||
message = "messaging.system attribute is not present or not equal to kafka in your spans"
|
||||
} else {
|
||||
status = "1"
|
||||
}
|
||||
} else if key == "kind" {
|
||||
attribute = "kind"
|
||||
if intValue != 0 {
|
||||
status = "0"
|
||||
message = "check if your producer spans has kind=4 as attribute"
|
||||
} else {
|
||||
status = "1"
|
||||
}
|
||||
} else if key == "destination" {
|
||||
attribute = "messaging.destination.name"
|
||||
if intValue != 0 {
|
||||
status = "0"
|
||||
message = "messaging.destination.name attribute is not present in your spans"
|
||||
} else {
|
||||
status = "1"
|
||||
}
|
||||
} else if key == "partition" {
|
||||
attribute = "messaging.destination.partition.id"
|
||||
if intValue != 0 {
|
||||
status = "0"
|
||||
message = "messaging.destination.partition.id attribute is not present in your spans"
|
||||
} else {
|
||||
status = "1"
|
||||
}
|
||||
}
|
||||
|
||||
entry := mq.OnboardingResponse{
|
||||
Attribute: attribute,
|
||||
Message: message,
|
||||
Status: status,
|
||||
}
|
||||
|
||||
entries = append(entries, entry)
|
||||
}
|
||||
}
|
||||
|
||||
sort.Slice(entries, func(i, j int) bool {
|
||||
return entries[i].Attribute < entries[j].Attribute
|
||||
})
|
||||
|
||||
aH.Respond(w, entries)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) onboardConsumers(
|
||||
|
||||
w http.ResponseWriter, r *http.Request,
|
||||
|
||||
) {
|
||||
messagingQueue, apiErr := ParseMessagingQueueBody(r)
|
||||
if apiErr != nil {
|
||||
zap.L().Error(apiErr.Err.Error())
|
||||
RespondError(w, apiErr, nil)
|
||||
return
|
||||
}
|
||||
|
||||
chq, err := mq.BuildClickHouseQuery(messagingQueue, mq.KafkaQueue, "onboard_consumers")
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error(err.Error())
|
||||
RespondError(w, apiErr, nil)
|
||||
return
|
||||
}
|
||||
|
||||
result, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
|
||||
|
||||
if err != nil {
|
||||
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
|
||||
RespondError(w, apiErrObj, err)
|
||||
return
|
||||
}
|
||||
|
||||
var entries []mq.OnboardingResponse
|
||||
|
||||
for _, result := range result {
|
||||
for key, value := range result.Data {
|
||||
var message, attribute, status string
|
||||
|
||||
intValue := int(*value.(*uint8))
|
||||
|
||||
if key == "entries" {
|
||||
attribute = "telemetry ingestion"
|
||||
if intValue != 0 {
|
||||
entries = nil
|
||||
entry := mq.OnboardingResponse{
|
||||
Attribute: attribute,
|
||||
Message: "No data available in the given time range",
|
||||
Status: "0",
|
||||
}
|
||||
entries = append(entries, entry)
|
||||
break
|
||||
} else {
|
||||
status = "1"
|
||||
}
|
||||
} else if key == "queue" {
|
||||
attribute = "messaging.system"
|
||||
if intValue != 0 {
|
||||
status = "0"
|
||||
message = "messaging.system attribute is not present or not equal to kafka in your spans"
|
||||
} else {
|
||||
status = "1"
|
||||
}
|
||||
} else if key == "kind" {
|
||||
attribute = "kind"
|
||||
if intValue != 0 {
|
||||
status = "0"
|
||||
message = "check if your consumer spans has kind=5 as attribute"
|
||||
} else {
|
||||
status = "1"
|
||||
}
|
||||
} else if key == "destination" {
|
||||
attribute = "messaging.destination.name"
|
||||
if intValue != 0 {
|
||||
status = "0"
|
||||
message = "messaging.destination.name attribute is not present in your spans"
|
||||
} else {
|
||||
status = "1"
|
||||
}
|
||||
} else if key == "partition" {
|
||||
attribute = "messaging.destination.partition.id"
|
||||
if intValue != 0 {
|
||||
status = "0"
|
||||
message = "messaging.destination.partition.id attribute is not present in your spans"
|
||||
} else {
|
||||
status = "1"
|
||||
}
|
||||
} else if key == "svc" {
|
||||
attribute = "service_name"
|
||||
if intValue != 0 {
|
||||
status = "0"
|
||||
message = "service_name attribute is not present in your spans"
|
||||
} else {
|
||||
status = "1"
|
||||
}
|
||||
} else if key == "cgroup" {
|
||||
attribute = "messaging.kafka.consumer.group"
|
||||
if intValue != 0 {
|
||||
status = "0"
|
||||
message = "messaging.kafka.consumer.group attribute is not present in your spans"
|
||||
} else {
|
||||
status = "1"
|
||||
}
|
||||
} else if key == "bodysize" {
|
||||
attribute = "messaging.message.body.size"
|
||||
if intValue != 0 {
|
||||
status = "0"
|
||||
message = "messaging.message.body.size attribute is not present in your spans"
|
||||
} else {
|
||||
status = "1"
|
||||
}
|
||||
} else if key == "clientid" {
|
||||
attribute = "messaging.client_id"
|
||||
if intValue != 0 {
|
||||
status = "0"
|
||||
message = "messaging.client_id attribute is not present in your spans"
|
||||
} else {
|
||||
status = "1"
|
||||
}
|
||||
} else if key == "instanceid" {
|
||||
attribute = "service.instance.id"
|
||||
if intValue != 0 {
|
||||
status = "0"
|
||||
message = "service.instance.id attribute is not present in your spans"
|
||||
} else {
|
||||
status = "1"
|
||||
}
|
||||
}
|
||||
|
||||
entry := mq.OnboardingResponse{
|
||||
Attribute: attribute,
|
||||
Message: message,
|
||||
Status: status,
|
||||
}
|
||||
entries = append(entries, entry)
|
||||
}
|
||||
}
|
||||
|
||||
sort.Slice(entries, func(i, j int) bool {
|
||||
return entries[i].Attribute < entries[j].Attribute
|
||||
})
|
||||
|
||||
aH.Respond(w, entries)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) onboardKafka(
|
||||
|
||||
w http.ResponseWriter, r *http.Request,
|
||||
|
||||
) {
|
||||
messagingQueue, apiErr := ParseMessagingQueueBody(r)
|
||||
if apiErr != nil {
|
||||
zap.L().Error(apiErr.Err.Error())
|
||||
RespondError(w, apiErr, nil)
|
||||
return
|
||||
}
|
||||
|
||||
chq, err := mq.BuildClickHouseQuery(messagingQueue, mq.KafkaQueue, "onboard_kafka")
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error(err.Error())
|
||||
RespondError(w, apiErr, nil)
|
||||
return
|
||||
}
|
||||
|
||||
result, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
|
||||
|
||||
if err != nil {
|
||||
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
|
||||
RespondError(w, apiErrObj, err)
|
||||
return
|
||||
}
|
||||
|
||||
var entries []mq.OnboardingResponse
|
||||
|
||||
for _, result := range result {
|
||||
for key, value := range result.Data {
|
||||
var message, attribute, status string
|
||||
|
||||
intValue := int(*value.(*uint8))
|
||||
|
||||
if key == "entries" {
|
||||
attribute = "telemetry ingestion"
|
||||
if intValue != 0 {
|
||||
entries = nil
|
||||
entry := mq.OnboardingResponse{
|
||||
Attribute: attribute,
|
||||
Message: "No data available in the given time range",
|
||||
Status: "0",
|
||||
}
|
||||
entries = append(entries, entry)
|
||||
break
|
||||
} else {
|
||||
status = "1"
|
||||
}
|
||||
} else if key == "fetchlatency" {
|
||||
attribute = "kafka_consumer_fetch_latency_avg"
|
||||
if intValue != 0 {
|
||||
status = "0"
|
||||
message = "Metric kafka_consumer_fetch_latency_avg is not present in the given time range."
|
||||
} else {
|
||||
status = "1"
|
||||
}
|
||||
} else if key == "grouplag" {
|
||||
attribute = "kafka_consumer_group_lag"
|
||||
if intValue != 0 {
|
||||
status = "0"
|
||||
message = "Metric kafka_consumer_group_lag is not present in the given time range."
|
||||
} else {
|
||||
status = "1"
|
||||
}
|
||||
}
|
||||
|
||||
entry := mq.OnboardingResponse{
|
||||
Attribute: attribute,
|
||||
Message: message,
|
||||
Status: status,
|
||||
}
|
||||
entries = append(entries, entry)
|
||||
}
|
||||
}
|
||||
|
||||
sort.Slice(entries, func(i, j int) bool {
|
||||
return entries[i].Attribute < entries[j].Attribute
|
||||
})
|
||||
|
||||
aH.Respond(w, entries)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) getNetworkData(
|
||||
w http.ResponseWriter, r *http.Request,
|
||||
) {
|
||||
|
@ -226,3 +226,296 @@ Response in query range `table` format
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Onboarding APIs
|
||||
|
||||
```
|
||||
/api/v1/messaging-queues/kafka/onboarding/producers
|
||||
```
|
||||
|
||||
```json
|
||||
{
|
||||
"start": 1727620544260611424,
|
||||
"end": 1727620556401481428
|
||||
}
|
||||
```
|
||||
|
||||
```json
|
||||
// everything is present
|
||||
{
|
||||
"status": "success",
|
||||
"data": [
|
||||
{
|
||||
"attribute": "kind",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
},
|
||||
{
|
||||
"attribute": "messaging.destination.name",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
},
|
||||
{
|
||||
"attribute": "messaging.destination.partition.id",
|
||||
"error_message": "messaging.destination.partition.id attribute is not present in your spans",
|
||||
"status": "0"
|
||||
},
|
||||
{
|
||||
"attribute": "messaging.system",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
},
|
||||
{
|
||||
"attribute": "telemetry ingestion",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
// partial attributes are present
|
||||
{
|
||||
"status": "success",
|
||||
"data": [
|
||||
{
|
||||
"attribute": "kind",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
},
|
||||
{
|
||||
"attribute": "messaging.destination.name",
|
||||
"error_message": "messaging.destination.name attribute is not present in your spans",
|
||||
"status": "0"
|
||||
},
|
||||
{
|
||||
"attribute": "messaging.destination.partition.id",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
},
|
||||
{
|
||||
"attribute": "messaging.system",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
},
|
||||
{
|
||||
"attribute": "telemetry ingestion",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
// no data available
|
||||
{
|
||||
"status": "success",
|
||||
"data": [
|
||||
{
|
||||
"attribute": "telemetry ingestion",
|
||||
"error_message": "No data available in the given time range",
|
||||
"status": "0"
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
```
|
||||
/api/v1/messaging-queues/kafka/onboarding/consumers
|
||||
```
|
||||
|
||||
```json
|
||||
// everything is present
|
||||
{
|
||||
"status": "success",
|
||||
"data": [
|
||||
{
|
||||
"attribute": "kind",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
},
|
||||
{
|
||||
"attribute": "messaging.client_id",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
},
|
||||
{
|
||||
"attribute": "messaging.destination.name",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
},
|
||||
{
|
||||
"attribute": "messaging.destination.partition.id",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
},
|
||||
{
|
||||
"attribute": "messaging.kafka.consumer.group",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
},
|
||||
{
|
||||
"attribute": "messaging.message.body.size",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
},
|
||||
{
|
||||
"attribute": "messaging.system",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
},
|
||||
{
|
||||
"attribute": "service.instance.id",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
},
|
||||
{
|
||||
"attribute": "service_name",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
},
|
||||
{
|
||||
"attribute": "telemetry ingestion",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
// partial attributes are present
|
||||
{
|
||||
"status": "success",
|
||||
"data": [
|
||||
{
|
||||
"attribute": "kind",
|
||||
"error_message": "check if your consumer spans has kind=5 as attribute",
|
||||
"status": "0"
|
||||
},
|
||||
{
|
||||
"attribute": "messaging.client_id",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
},
|
||||
{
|
||||
"attribute": "messaging.destination.name",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
},
|
||||
{
|
||||
"attribute": "messaging.destination.partition.id",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
},
|
||||
{
|
||||
"attribute": "messaging.kafka.consumer.group",
|
||||
"error_message": "messaging.kafka.consumer.group attribute is not present in your spans",
|
||||
"status": "0"
|
||||
},
|
||||
{
|
||||
"attribute": "messaging.message.body.size",
|
||||
"error_message": "messaging.message.body.size attribute is not present in your spans",
|
||||
"status": "0"
|
||||
},
|
||||
{
|
||||
"attribute": "messaging.system",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
},
|
||||
{
|
||||
"attribute": "service.instance.id",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
},
|
||||
{
|
||||
"attribute": "service_name",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
},
|
||||
{
|
||||
"attribute": "telemetry ingestion",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
// no data available
|
||||
{
|
||||
"status": "success",
|
||||
"data": [
|
||||
{
|
||||
"attribute": "telemetry ingestion",
|
||||
"error_message": "No data available in the given time range",
|
||||
"status": "0"
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
```
|
||||
/api/v1/messaging-queues/kafka/onboarding/kafka
|
||||
```
|
||||
|
||||
```json
|
||||
{
|
||||
"start": 1728485200000000000,
|
||||
"end": 1728749800000000000
|
||||
}
|
||||
```
|
||||
|
||||
```json
|
||||
// everything is present
|
||||
{
|
||||
"status": "success",
|
||||
"data": [
|
||||
{
|
||||
"attribute": "telemetry ingestion",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
},
|
||||
{
|
||||
"attribute": "kafka_consumer_fetch_latency_avg",
|
||||
"error_message": "Metric kafka_consumer_fetch_latency_avg is not present in the given time range.",
|
||||
"status": "0"
|
||||
},
|
||||
{
|
||||
"attribute": "kafka_consumer_group_lag",
|
||||
"error_message": "Metric kafka_consumer_group_lag is not present in the given time range.",
|
||||
"status": "0"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
// partial attributes are present
|
||||
{
|
||||
"status": "success",
|
||||
"data": [
|
||||
{
|
||||
"attribute": "telemetry ingestion",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
},
|
||||
{
|
||||
"attribute": "kafka_consumer_fetch_latency_avg",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
},
|
||||
{
|
||||
"attribute": "kafka_consumer_group_lag",
|
||||
"error_message": "",
|
||||
"status": "1"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
// no data available
|
||||
{
|
||||
"status": "success",
|
||||
"data": [
|
||||
{
|
||||
"attribute": "telemetry ingestion",
|
||||
"error_message": "No data available in the given time range",
|
||||
"status": "0"
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
@ -1,6 +1,6 @@
|
||||
package kafka
|
||||
|
||||
const kafkaQueue = "kafka"
|
||||
const KafkaQueue = "kafka"
|
||||
|
||||
type MessagingQueue struct {
|
||||
Start int64 `json:"start"`
|
||||
@ -14,3 +14,9 @@ type Clients struct {
|
||||
ServiceInstanceID []string
|
||||
ServiceName []string
|
||||
}
|
||||
|
||||
type OnboardingResponse struct {
|
||||
Attribute string `json:"attribute"`
|
||||
Message string `json:"error_message"`
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
@ -95,3 +95,54 @@ ORDER BY throughput DESC
|
||||
`, timeRange, start, end, queueType, consumerGroup, partitionID)
|
||||
return query
|
||||
}
|
||||
|
||||
func onboardProducersSQL(start, end int64, queueType string) string {
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
COUNT(*) = 0 AS entries,
|
||||
COUNT(IF(msgSystem = '%s', 1, NULL)) = 0 AS queue,
|
||||
COUNT(IF(kind = 4, 1, NULL)) = 0 AS kind,
|
||||
COUNT(IF(has(stringTagMap, 'messaging.destination.name'), 1, NULL)) = 0 AS destination,
|
||||
COUNT(IF(has(stringTagMap, 'messaging.destination.partition.id'), 1, NULL)) = 0 AS partition
|
||||
FROM
|
||||
signoz_traces.distributed_signoz_index_v2
|
||||
WHERE
|
||||
timestamp >= '%d'
|
||||
AND timestamp <= '%d';`, queueType, start, end)
|
||||
return query
|
||||
}
|
||||
|
||||
func onboardConsumerSQL(start, end int64, queueType string) string {
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
COUNT(*) = 0 AS entries,
|
||||
COUNT(IF(msgSystem = '%s', 1, NULL)) = 0 AS queue,
|
||||
COUNT(IF(kind = 5, 1, NULL)) = 0 AS kind,
|
||||
COUNT(serviceName) = 0 AS svc,
|
||||
COUNT(IF(has(stringTagMap, 'messaging.destination.name'), 1, NULL)) = 0 AS destination,
|
||||
COUNT(IF(has(stringTagMap, 'messaging.destination.partition.id'), 1, NULL)) = 0 AS partition,
|
||||
COUNT(IF(has(stringTagMap, 'messaging.kafka.consumer.group'), 1, NULL)) = 0 AS cgroup,
|
||||
COUNT(IF(has(numberTagMap, 'messaging.message.body.size'), 1, NULL)) = 0 AS bodysize,
|
||||
COUNT(IF(has(stringTagMap, 'messaging.client_id'), 1, NULL)) = 0 AS clientid,
|
||||
COUNT(IF(has(stringTagMap, 'service.instance.id'), 1, NULL)) = 0 AS instanceid
|
||||
FROM signoz_traces.distributed_signoz_index_v2
|
||||
WHERE
|
||||
timestamp >= '%d'
|
||||
AND timestamp <= '%d';`, queueType, start, end)
|
||||
return query
|
||||
}
|
||||
|
||||
func onboardKafkaSQL(start, end int64) string {
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
COUNT(*) = 0 AS entries,
|
||||
COUNT(IF(metric_name = 'kafka_consumer_fetch_latency_avg', 1, NULL)) = 0 AS fetchlatency,
|
||||
COUNT(IF(metric_name = 'kafka_consumer_group_lag', 1, NULL)) = 0 AS grouplag
|
||||
FROM
|
||||
signoz_metrics.time_series_v4_1day
|
||||
WHERE
|
||||
metric_name IN ('kafka_consumer_fetch_latency_avg', 'kafka_consumer_group_lag')
|
||||
AND unix_milli >= '%d'
|
||||
AND unix_milli < '%d';`, start/1000000, end/1000000)
|
||||
return query
|
||||
}
|
||||
|
@ -13,16 +13,16 @@ var defaultStepInterval int64 = 60
|
||||
func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) (*v3.QueryRangeParamsV3, error) {
|
||||
|
||||
// ToDo: propagate this through APIs when there are different handlers
|
||||
queueType := kafkaQueue
|
||||
queueType := KafkaQueue
|
||||
|
||||
var cq *v3.CompositeQuery
|
||||
|
||||
chq, err := buildClickHouseQuery(messagingQueue, queueType, queryContext)
|
||||
chq, err := BuildClickHouseQuery(messagingQueue, queueType, queryContext)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var cq *v3.CompositeQuery
|
||||
|
||||
cq, err = buildCompositeQuery(chq, queryContext)
|
||||
|
||||
queryRangeParams := &v3.QueryRangeParamsV3{
|
||||
@ -37,6 +37,14 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string)
|
||||
return queryRangeParams, nil
|
||||
}
|
||||
|
||||
func PrepareClikhouseQueries(messagingQueue *MessagingQueue, queryContext string) (*v3.ClickHouseQuery, error) {
|
||||
queueType := KafkaQueue
|
||||
|
||||
chq, err := BuildClickHouseQuery(messagingQueue, queueType, queryContext)
|
||||
|
||||
return chq, err
|
||||
}
|
||||
|
||||
func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) {
|
||||
start := messagingQueue.Start
|
||||
end := messagingQueue.End
|
||||
@ -137,7 +145,7 @@ func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCac
|
||||
|
||||
func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, attributeCache *Clients) (*v3.QueryRangeParamsV3, error) {
|
||||
|
||||
queueType := kafkaQueue
|
||||
queueType := KafkaQueue
|
||||
|
||||
unixMilliStart := messagingQueue.Start / 1000000
|
||||
unixMilliEnd := messagingQueue.End / 1000000
|
||||
@ -177,17 +185,22 @@ func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, a
|
||||
return queryRangeParams, nil
|
||||
}
|
||||
|
||||
func buildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, queryContext string) (*v3.ClickHouseQuery, error) {
|
||||
func BuildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, queryContext string) (*v3.ClickHouseQuery, error) {
|
||||
start := messagingQueue.Start
|
||||
end := messagingQueue.End
|
||||
topic, ok := messagingQueue.Variables["topic"]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("invalid type for Topic")
|
||||
}
|
||||
|
||||
partition, ok := messagingQueue.Variables["partition"]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("invalid type for Partition")
|
||||
var topic, partition string
|
||||
|
||||
if queryContext == "producer" || queryContext == "consumer" {
|
||||
var ok bool
|
||||
topic, ok = messagingQueue.Variables["topic"]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("invalid type for Topic")
|
||||
}
|
||||
partition, ok = messagingQueue.Variables["partition"]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("invalid type for Partition")
|
||||
}
|
||||
}
|
||||
|
||||
var query string
|
||||
@ -199,6 +212,12 @@ func buildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, quer
|
||||
return nil, fmt.Errorf("invalid type for consumer group")
|
||||
}
|
||||
query = generateConsumerSQL(start, end, topic, partition, consumerGroup, queueType)
|
||||
} else if queryContext == "onboard_producers" {
|
||||
query = onboardProducersSQL(start, end, queueType)
|
||||
} else if queryContext == "onboard_consumers" {
|
||||
query = onboardConsumerSQL(start, end, queueType)
|
||||
} else if queryContext == "onboard_kafka" {
|
||||
query = onboardKafkaSQL(start, end)
|
||||
}
|
||||
|
||||
return &v3.ClickHouseQuery{
|
||||
|
Loading…
x
Reference in New Issue
Block a user