feat: a bunch of advancements, query optimisation, new response format

This commit is contained in:
shivanshu 2024-10-04 01:10:13 +05:30
parent ea0263cc73
commit d380894c35
No known key found for this signature in database
GPG Key ID: 0F9ACBC3AA12DC71
4 changed files with 283 additions and 189 deletions

View File

@ -11,6 +11,7 @@ import (
"net/http" "net/http"
"regexp" "regexp"
"slices" "slices"
"sort"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -2463,15 +2464,19 @@ func (aH *APIHandler) WriteJSON(w http.ResponseWriter, r *http.Request, response
// RegisterMessagingQueuesRoutes adds messaging-queues routes // RegisterMessagingQueuesRoutes adds messaging-queues routes
func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *AuthMiddleware) { func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *AuthMiddleware) {
// SubRouter for kafka
kafkaSubRouter := router.PathPrefix("/api/v1/messaging-queues/kafka/consumer-lag").Subrouter()
kafkaSubRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost) // SubRouter for kafka
kafkaSubRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost) kafkaRouter := router.PathPrefix("/api/v1/messaging-queues/kafka").Subrouter()
kafkaSubRouter.HandleFunc("/network-latency", am.ViewAccess(aH.getNetworkData)).Methods(http.MethodPost)
kafkaSubRouter.HandleFunc("/onboarding-producers", am.ViewAccess(aH.onboardProducers)).Methods(http.MethodPost) consumerLagRouter := kafkaRouter.PathPrefix("/consumer-lag").Subrouter()
kafkaSubRouter.HandleFunc("/onboarding-consumers", am.ViewAccess(aH.onboardConsumers)).Methods(http.MethodPost) consumerLagRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost)
kafkaSubRouter.HandleFunc("/onboarding-kafka", am.ViewAccess(aH.onboardKafka)).Methods(http.MethodPost) consumerLagRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost)
consumerLagRouter.HandleFunc("/network-latency", am.ViewAccess(aH.getNetworkData)).Methods(http.MethodPost)
onboardingRouter := kafkaRouter.PathPrefix("/onboarding").Subrouter()
onboardingRouter.HandleFunc("/producers", am.ViewAccess(aH.onboardProducers)).Methods(http.MethodPost)
onboardingRouter.HandleFunc("/consumers", am.ViewAccess(aH.onboardConsumers)).Methods(http.MethodPost)
onboardingRouter.HandleFunc("/kafka", am.ViewAccess(aH.onboardKafka)).Methods(http.MethodPost)
// for other messaging queues, add SubRouters here // for other messaging queues, add SubRouters here
} }
@ -2493,69 +2498,94 @@ func (aH *APIHandler) onboardProducers(
return return
} }
queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "onboard_producers") chq, err := mq.BuildClickHouseQuery(messagingQueue, mq.KafkaQueue, "onboard_producers")
if err != nil { if err != nil {
zap.L().Error(err.Error()) zap.L().Error(err.Error())
RespondError(w, apiErr, nil) RespondError(w, apiErr, nil)
return return
} }
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { results, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
result, errQuriesByName, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams)
if err != nil { if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName) RespondError(w, apiErrObj, err)
return return
} }
transformedSeries := &v3.Series{ var entries []mq.OnboardingResponse
Labels: make(map[string]string),
LabelsArray: make([]map[string]string, 0),
Points: make([]v3.Point, 0),
}
var value string for _, result := range results {
for _, result := range result { for key, value := range result.Data {
for _, series := range result.Series { var message, attribute, status string
for _, point := range series.Points {
switch point.Value { intValue := int(*value.(*uint8))
case 0:
value = "All attributes are present and meet the conditions" if key == "entries" {
case 1: attribute = "telemetry ingestion"
value = "No data available in the given time range" if intValue != 0 {
case 2: entries = nil
value = "messaging.system attribute is not present or not equal to kafka in your spans" entry := mq.OnboardingResponse{
case 3: Attribute: attribute,
value = "check if your producer spans has kind producer" Message: "No data available in the given time range",
case 4: Status: "0",
value = "messaging.destination.name attribute is not present in your spans" }
case 5: entries = append(entries, entry)
value = "messaging.destination.partition.id attribute is not present in your spans" break
default: } else {
value = "Unknown problem occurred, try increasing the time" status = "1"
}
} else if key == "queue" {
attribute = "messaging.system"
if intValue != 0 {
status = "0"
message = "messaging.system attribute is not present or not equal to kafka in your spans"
} else {
status = "1"
}
} else if key == "kind" {
attribute = "kind"
if intValue != 0 {
status = "0"
message = "check if your producer spans has kind=4 as attribute"
} else {
status = "1"
}
} else if key == "destination" {
attribute = "messaging.destination.name"
if intValue != 0 {
status = "0"
message = "messaging.destination.name attribute is not present in your spans"
} else {
status = "1"
}
} else if key == "partition" {
attribute = "messaging.destination.partition.id"
if intValue != 0 {
status = "0"
message = "messaging.destination.partition.id attribute is not present in your spans"
} else {
status = "1"
} }
} }
entry := mq.OnboardingResponse{
Attribute: attribute,
Message: message,
Status: status,
}
entries = append(entries, entry)
} }
} }
transformedSeries.LabelsArray = append(transformedSeries.LabelsArray, map[string]string{"result": value}) sort.Slice(entries, func(i, j int) bool {
return entries[i].Attribute < entries[j].Attribute
})
for _, result := range result { aH.Respond(w, entries)
for i := range result.Series {
result.Series[i] = transformedSeries
}
}
resp := v3.QueryRangeResponse{
Result: result,
}
aH.Respond(w, resp)
} }
func (aH *APIHandler) onboardConsumers( func (aH *APIHandler) onboardConsumers(
@ -2570,79 +2600,128 @@ func (aH *APIHandler) onboardConsumers(
return return
} }
queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "onboard_consumers") chq, err := mq.BuildClickHouseQuery(messagingQueue, mq.KafkaQueue, "onboard_producers")
if err != nil { if err != nil {
zap.L().Error(err.Error()) zap.L().Error(err.Error())
RespondError(w, apiErr, nil) RespondError(w, apiErr, nil)
return return
} }
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { result, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
result, errQuriesByName, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams)
if err != nil { if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName) RespondError(w, apiErrObj, err)
return return
} }
transformedSeries := &v3.Series{ var entries []mq.OnboardingResponse
Labels: make(map[string]string),
LabelsArray: make([]map[string]string, 0),
Points: make([]v3.Point, 0),
}
var value string
for _, result := range result { for _, result := range result {
for _, series := range result.Series { for key, value := range result.Data {
for _, point := range series.Points { var message, attribute, status string
switch point.Value {
case 0: intValue := int(*value.(*uint8))
value = "All attributes are present and meet the conditions"
case 1: if key == "entries" {
value = "No data available in the given time range" attribute = "telemetry ingestion"
case 2: if intValue != 0 {
value = "msgSystem attribute is not present or not equal to 'kafka' in your spans" entries = nil
case 3: entry := mq.OnboardingResponse{
value = "kind attribute is not present or not equal to 5 in your spans" Attribute: attribute,
case 4: Message: "No data available in the given time range",
value = "serviceName attribute is not present in your spans" Status: "0",
case 5: }
value = "messaging.destination.name attribute is not present in your spans" entries = append(entries, entry)
case 6: break
value = "messaging.destination.partition.id attribute is not present in your spans" } else {
case 7: status = "1"
value = "messaging.kafka.consumer.group attribute is not present in your spans" }
case 8: } else if key == "queue" {
value = "messaging.message.body.size attribute is not present in your spans" attribute = "messaging.system"
case 9: if intValue != 0 {
value = "messaging.client_id attribute is not present in your spans" status = "0"
case 10: message = "messaging.system attribute is not present or not equal to kafka in your spans"
value = "service.instance.id attribute is not present in your spans" } else {
default: status = "1"
value = "Unknown problem occurred, try increasing the time" }
} else if key == "kind" {
attribute = "kind"
if intValue != 0 {
status = "0"
message = "check if your producer spans has kind=4 as attribute"
} else {
status = "1"
}
} else if key == "destination" {
attribute = "messaging.destination.name"
if intValue != 0 {
status = "0"
message = "messaging.destination.name attribute is not present in your spans"
} else {
status = "1"
}
} else if key == "partition" {
attribute = "messaging.destination.partition.id"
if intValue != 0 {
status = "0"
message = "messaging.destination.partition.id attribute is not present in your spans"
} else {
status = "1"
}
} else if key == "svc" {
attribute = "service_name"
if intValue != 0 {
status = "0"
message = "service_name attribute is not present in your spans"
} else {
status = "1"
}
} else if key == "cgroup" {
attribute = "messaging.kafka.consumer.group"
if intValue != 0 {
status = "0"
message = "messaging.kafka.consumer.group attribute is not present in your spans"
} else {
status = "1"
}
} else if key == "bodysize" {
attribute = "messaging.message.body.size"
if intValue != 0 {
status = "0"
message = "messaging.message.body.size attribute is not present in your spans"
} else {
status = "1"
}
} else if key == "clientid" {
attribute = "messaging.client_id"
if intValue != 0 {
status = "0"
message = "messaging.client_id attribute is not present in your spans"
} else {
status = "1"
}
} else if key == "instanceid" {
attribute = "service.instance.id"
if intValue != 0 {
status = "0"
message = "service.instance.id attribute is not present in your spans"
} else {
status = "1"
} }
} }
entry := mq.OnboardingResponse{
Attribute: attribute,
Message: message,
Status: status,
}
entries = append(entries, entry)
} }
} }
transformedSeries.LabelsArray = append(transformedSeries.LabelsArray, map[string]string{"result": value}) aH.Respond(w, entries)
for _, result := range result {
for i := range result.Series {
result.Series[i] = transformedSeries
}
}
resp := v3.QueryRangeResponse{
Result: result,
}
aH.Respond(w, resp)
} }
func (aH *APIHandler) onboardKafka( func (aH *APIHandler) onboardKafka(
@ -2657,63 +2736,72 @@ func (aH *APIHandler) onboardKafka(
return return
} }
queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "onboard_consumers") chq, err := mq.BuildClickHouseQuery(messagingQueue, mq.KafkaQueue, "onboard_kafka")
if err != nil { if err != nil {
zap.L().Error(err.Error()) zap.L().Error(err.Error())
RespondError(w, apiErr, nil) RespondError(w, apiErr, nil)
return return
} }
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { result, err := aH.reader.GetListResultV3(r.Context(), chq.Query)
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
result, errQuriesByName, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams)
if err != nil { if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName) RespondError(w, apiErrObj, err)
return return
} }
transformedSeries := &v3.Series{ var entries []mq.OnboardingResponse
Labels: make(map[string]string),
LabelsArray: make([]map[string]string, 0),
Points: make([]v3.Point, 0),
}
var value string
for _, result := range result { for _, result := range result {
for _, series := range result.Series { for key, value := range result.Data {
for _, point := range series.Points { var message, attribute, status string
switch point.Value {
case 0: intValue := int(*value.(*uint8))
value = "All required metrics are present and meet the conditions"
case 1: if key == "entries" {
value = "Neither metric (kafka_consumer_fetch_latency_avg nor kafka_consumer_group_lag) is present in the given time range." attribute = "telemetry ingestion"
case 2: if intValue != 0 {
value = "Metric kafka_consumer_fetch_latency_avg is not present in the given time range." entries = nil
default: entry := mq.OnboardingResponse{
value = "Unknown problem occurred, try increasing the time" Attribute: attribute,
Message: "No data available in the given time range",
Status: "0",
}
entries = append(entries, entry)
break
} else {
status = "1"
}
} else if key == "fetchlatency" {
attribute = "kafka_consumer_fetch_latency_avg"
if intValue != 0 {
status = "0"
message = "Metric kafka_consumer_fetch_latency_avg is not present in the given time range."
} else {
status = "1"
}
} else if key == "grouplag" {
attribute = "kafka_consumer_group_lag"
if intValue != 0 {
status = "0"
message = "Metric kafka_consumer_group_lag is not present in the given time range."
} else {
status = "1"
} }
} }
entry := mq.OnboardingResponse{
Attribute: attribute,
Message: message,
Status: status,
}
entries = append(entries, entry)
} }
} }
transformedSeries.LabelsArray = append(transformedSeries.LabelsArray, map[string]string{"result": value}) aH.Respond(w, entries)
for _, result := range result {
for i := range result.Series {
result.Series[i] = transformedSeries
}
}
resp := v3.QueryRangeResponse{
Result: result,
}
aH.Respond(w, resp)
} }
func (aH *APIHandler) getNetworkData( func (aH *APIHandler) getNetworkData(

View File

@ -1,6 +1,6 @@
package kafka package kafka
const kafkaQueue = "kafka" const KafkaQueue = "kafka"
type MessagingQueue struct { type MessagingQueue struct {
Start int64 `json:"start"` Start int64 `json:"start"`
@ -14,3 +14,9 @@ type Clients struct {
ServiceInstanceID []string ServiceInstanceID []string
ServiceName []string ServiceName []string
} }
type OnboardingResponse struct {
Attribute string `json:"attribute"`
Message string `json:"error_message"`
Status string `json:"status"`
}

View File

@ -99,58 +99,50 @@ ORDER BY throughput DESC
func onboardProducersSQL(start, end int64, queueType string) string { func onboardProducersSQL(start, end int64, queueType string) string {
query := fmt.Sprintf(` query := fmt.Sprintf(`
SELECT SELECT
CASE COUNT(*) = 0 AS entries,
WHEN COUNT(*) = 0 THEN 1 COUNT(IF(msgSystem = '%s', 1, NULL)) = 0 AS queue,
WHEN SUM(msgSystem = '%s') = 0 THEN 2 COUNT(IF(kind = 4, 1, NULL)) = 0 AS kind,
WHEN SUM(kind = 4) = 0 THEN 3 COUNT(IF(has(stringTagMap, 'messaging.destination.name'), 1, NULL)) = 0 AS destination,
WHEN SUM(has(stringTagMap, 'messaging.destination.name')) = 0 THEN 4 COUNT(IF(has(stringTagMap, 'messaging.destination.partition.id'), 1, NULL)) = 0 AS partition
WHEN SUM(has(stringTagMap, 'messaging.destination.partition.id')) = 0 THEN 5 FROM
ELSE 0 signoz_traces.distributed_signoz_index_v2
END AS result_code
FROM signoz_traces.distributed_signoz_index_v2
WHERE WHERE
timestamp >= '%d' timestamp >= '%d'
AND timestamp <= '%d';`, queueType, start, end) AND timestamp <= '%d';`, queueType, start, end)
return query return query
} }
func onboardConsumerSQL(start, end int64, queueType string) string { func onboardConsumerSQL(start, end int64, queueType string) string {
query := fmt.Sprintf(` query := fmt.Sprintf(`
SELECT SELECT
CASE COUNT(*) = 0 AS entries,
WHEN COUNT(*) = 0 THEN 1 COUNT(IF(msgSystem = '%s', 1, NULL)) = 0 AS queue,
WHEN SUM(msgSystem = '%s') = 0 THEN 2 COUNT(IF(kind = 5, 1, NULL)) = 0 AS kind,
WHEN SUM(kind = 5) = 0 THEN 3 COUNT(serviceName) = 0 AS svc,
WHEN SUM(serviceName IS NOT NULL) = 0 THEN 4 COUNT(IF(has(stringTagMap, 'messaging.destination.name'), 1, NULL)) = 0 AS destination,
WHEN SUM(has(stringTagMap, 'messaging.destination.name')) = 0 THEN 5 COUNT(IF(has(stringTagMap, 'messaging.destination.partition.id'), 1, NULL)) = 0 AS partition,
WHEN SUM(has(stringTagMap, 'messaging.destination.partition.id')) = 0 THEN 6 COUNT(IF(has(stringTagMap, 'messaging.kafka.consumer.group'), 1, NULL)) = 0 AS cgroup,
WHEN SUM(has(stringTagMap, 'messaging.kafka.consumer.group')) = 0 THEN 7 COUNT(IF(has(numberTagMap, 'messaging.message.body.size'), 1, NULL)) = 0 AS bodysize,
WHEN SUM(has(numberTagMap, 'messaging.message.body.size')) = 0 THEN 8 COUNT(IF(has(stringTagMap, 'messaging.client_id'), 1, NULL)) = 0 AS clientid,
WHEN SUM(has(stringTagMap, 'messaging.client_id')) = 0 THEN 9 COUNT(IF(has(stringTagMap, 'service.instance.id'), 1, NULL)) = 0 AS instanceid
WHEN SUM(has(stringTagMap, 'service.instance.id')) = 0 THEN 10
ELSE 0
END AS result_code
FROM signoz_traces.distributed_signoz_index_v2 FROM signoz_traces.distributed_signoz_index_v2
WHERE WHERE
timestamp >= '%d' timestamp >= '%d'
AND timestamp <= '%d';`, queueType, start, end) AND timestamp <= '%d';`, queueType, start, end)
return query return query
} }
func onboardKafkaSQL(start, end int64) string { func onboardKafkaSQL(start, end int64) string {
query := fmt.Sprintf(` query := fmt.Sprintf(`
SELECT SELECT
CASE COUNT(*) = 0 AS entries,
WHEN COUNT(CASE WHEN metric_name = 'kafka_consumer_fetch_latency_avg' THEN 1 END) = 0 COUNT(IF(metric_name = 'kafka_consumer_fetch_latency_avg', 1, NULL)) = 0 AS fetchlatency,
AND COUNT(CASE WHEN metric_name = 'kafka_consumer_group_lag' THEN 1 END) = 0 THEN 1 COUNT(IF(metric_name = 'kafka_consumer_group_lag', 1, NULL)) = 0 AS grouplag
WHEN COUNT(CASE WHEN metric_name = 'kafka_consumer_fetch_latency_avg' THEN 1 END) = 0 THEN 2 FROM
WHEN COUNT(CASE WHEN metric_name = 'kafka_consumer_group_lag' THEN 1 END) = 0 THEN 3 signoz_metrics.time_series_v4_1day
ELSE 0
END AS result_code
FROM signoz_metrics.time_series_v4_1day
WHERE WHERE
metric_name IN ('kafka_consumer_fetch_latency_avg', 'kafka_consumer_group_lag') metric_name IN ('kafka_consumer_fetch_latency_avg', 'kafka_consumer_group_lag')
AND unix_milli >= '%d' AND unix_milli >= '%d'
AND unix_milli < '%d';`, start, end) AND unix_milli < '%d';`, start/1000000, end/1000000)
return query return query
} }

View File

@ -13,16 +13,16 @@ var defaultStepInterval int64 = 60
func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) (*v3.QueryRangeParamsV3, error) { func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) (*v3.QueryRangeParamsV3, error) {
// ToDo: propagate this through APIs when there are different handlers // ToDo: propagate this through APIs when there are different handlers
queueType := kafkaQueue queueType := KafkaQueue
var cq *v3.CompositeQuery chq, err := BuildClickHouseQuery(messagingQueue, queueType, queryContext)
chq, err := buildClickHouseQuery(messagingQueue, queueType, queryContext)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var cq *v3.CompositeQuery
cq, err = buildCompositeQuery(chq, queryContext) cq, err = buildCompositeQuery(chq, queryContext)
queryRangeParams := &v3.QueryRangeParamsV3{ queryRangeParams := &v3.QueryRangeParamsV3{
@ -37,6 +37,14 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string)
return queryRangeParams, nil return queryRangeParams, nil
} }
func PrepareClikhouseQueries(messagingQueue *MessagingQueue, queryContext string) (*v3.ClickHouseQuery, error) {
queueType := KafkaQueue
chq, err := BuildClickHouseQuery(messagingQueue, queueType, queryContext)
return chq, err
}
func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) { func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) {
start := messagingQueue.Start start := messagingQueue.Start
end := messagingQueue.End end := messagingQueue.End
@ -137,7 +145,7 @@ func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCac
func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, attributeCache *Clients) (*v3.QueryRangeParamsV3, error) { func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, attributeCache *Clients) (*v3.QueryRangeParamsV3, error) {
queueType := kafkaQueue queueType := KafkaQueue
unixMilliStart := messagingQueue.Start / 1000000 unixMilliStart := messagingQueue.Start / 1000000
unixMilliEnd := messagingQueue.End / 1000000 unixMilliEnd := messagingQueue.End / 1000000
@ -177,7 +185,7 @@ func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, a
return queryRangeParams, nil return queryRangeParams, nil
} }
func buildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, queryContext string) (*v3.ClickHouseQuery, error) { func BuildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, queryContext string) (*v3.ClickHouseQuery, error) {
start := messagingQueue.Start start := messagingQueue.Start
end := messagingQueue.End end := messagingQueue.End