From ea0263cc73dd25383cda4dd0573c63f084e22264 Mon Sep 17 00:00:00 2001 From: shivanshu Date: Mon, 30 Sep 2024 18:13:39 +0530 Subject: [PATCH] feat: onboarding APIs --- pkg/query-service/app/http_handler.go | 238 ++++++++++++++++++ .../messagingQueues/kafka/consumerLag.md | 90 +++++++ .../messagingQueues/kafka/translator.go | 25 +- 3 files changed, 346 insertions(+), 7 deletions(-) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 5055913113..b6a7c44262 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -2469,6 +2469,9 @@ func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *Auth kafkaSubRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost) kafkaSubRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost) kafkaSubRouter.HandleFunc("/network-latency", am.ViewAccess(aH.getNetworkData)).Methods(http.MethodPost) + kafkaSubRouter.HandleFunc("/onboarding-producers", am.ViewAccess(aH.onboardProducers)).Methods(http.MethodPost) + kafkaSubRouter.HandleFunc("/onboarding-consumers", am.ViewAccess(aH.onboardConsumers)).Methods(http.MethodPost) + kafkaSubRouter.HandleFunc("/onboarding-kafka", am.ViewAccess(aH.onboardKafka)).Methods(http.MethodPost) // for other messaging queues, add SubRouters here } @@ -2478,6 +2481,241 @@ func uniqueIdentifier(clientID, serviceInstanceID, serviceName, separator string return clientID + separator + serviceInstanceID + separator + serviceName } +func (aH *APIHandler) onboardProducers( + + w http.ResponseWriter, r *http.Request, + +) { + messagingQueue, apiErr := ParseMessagingQueueBody(r) + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "onboard_producers") + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + result, errQuriesByName, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + + transformedSeries := &v3.Series{ + Labels: make(map[string]string), + LabelsArray: make([]map[string]string, 0), + Points: make([]v3.Point, 0), + } + + var value string + + for _, result := range result { + for _, series := range result.Series { + for _, point := range series.Points { + switch point.Value { + case 0: + value = "All attributes are present and meet the conditions" + case 1: + value = "No data available in the given time range" + case 2: + value = "messaging.system attribute is not present or not equal to kafka in your spans" + case 3: + value = "check if your producer spans has kind producer" + case 4: + value = "messaging.destination.name attribute is not present in your spans" + case 5: + value = "messaging.destination.partition.id attribute is not present in your spans" + default: + value = "Unknown problem occurred, try increasing the time" + } + } + } + } + + transformedSeries.LabelsArray = append(transformedSeries.LabelsArray, map[string]string{"result": value}) + + for _, result := range result { + for i := range result.Series { + result.Series[i] = transformedSeries + } + } + + resp := v3.QueryRangeResponse{ + Result: result, + } + aH.Respond(w, resp) +} + +func (aH *APIHandler) onboardConsumers( + + w http.ResponseWriter, r *http.Request, + +) { + messagingQueue, apiErr := ParseMessagingQueueBody(r) + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "onboard_consumers") + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + result, errQuriesByName, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + + transformedSeries := &v3.Series{ + Labels: make(map[string]string), + LabelsArray: make([]map[string]string, 0), + Points: make([]v3.Point, 0), + } + + var value string + + for _, result := range result { + for _, series := range result.Series { + for _, point := range series.Points { + switch point.Value { + case 0: + value = "All attributes are present and meet the conditions" + case 1: + value = "No data available in the given time range" + case 2: + value = "msgSystem attribute is not present or not equal to 'kafka' in your spans" + case 3: + value = "kind attribute is not present or not equal to 5 in your spans" + case 4: + value = "serviceName attribute is not present in your spans" + case 5: + value = "messaging.destination.name attribute is not present in your spans" + case 6: + value = "messaging.destination.partition.id attribute is not present in your spans" + case 7: + value = "messaging.kafka.consumer.group attribute is not present in your spans" + case 8: + value = "messaging.message.body.size attribute is not present in your spans" + case 9: + value = "messaging.client_id attribute is not present in your spans" + case 10: + value = "service.instance.id attribute is not present in your spans" + default: + value = "Unknown problem occurred, try increasing the time" + } + } + } + } + + transformedSeries.LabelsArray = append(transformedSeries.LabelsArray, map[string]string{"result": value}) + + for _, result := range result { + for i := range result.Series { + result.Series[i] = transformedSeries + } + } + + resp := v3.QueryRangeResponse{ + Result: result, + } + aH.Respond(w, resp) +} + +func (aH *APIHandler) onboardKafka( + + w http.ResponseWriter, r *http.Request, + +) { + messagingQueue, apiErr := ParseMessagingQueueBody(r) + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "onboard_consumers") + if err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + result, errQuriesByName, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + + transformedSeries := &v3.Series{ + Labels: make(map[string]string), + LabelsArray: make([]map[string]string, 0), + Points: make([]v3.Point, 0), + } + + var value string + + for _, result := range result { + for _, series := range result.Series { + for _, point := range series.Points { + switch point.Value { + case 0: + value = "All required metrics are present and meet the conditions" + case 1: + value = "Neither metric (kafka_consumer_fetch_latency_avg nor kafka_consumer_group_lag) is present in the given time range." + case 2: + value = "Metric kafka_consumer_fetch_latency_avg is not present in the given time range." + default: + value = "Unknown problem occurred, try increasing the time" + } + } + } + } + + transformedSeries.LabelsArray = append(transformedSeries.LabelsArray, map[string]string{"result": value}) + + for _, result := range result { + for i := range result.Series { + result.Series[i] = transformedSeries + } + } + + resp := v3.QueryRangeResponse{ + Result: result, + } + aH.Respond(w, resp) +} + func (aH *APIHandler) getNetworkData( w http.ResponseWriter, r *http.Request, ) { diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md index 38b61669ff..2d4b80ac40 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/consumerLag.md @@ -226,3 +226,93 @@ Response in query range `table` format } } ``` + +### Onboarding APIs + +``` +/api/v1/messaging-queues/kafka/consumer-lag/onboarding-producers +``` + +``` +{ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "queryName": "onboard_producers", + "series": [ + { + "labels": {}, + "labelsArray": [ + { + "result": "All attributes are present and meet the conditions" + } + ], + "values": [] + } + ] + } + ] + } +} +``` + + +``` +/api/v1/messaging-queues/kafka/consumer-lag/onboarding-consumers +``` + +``` +{ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "queryName": "onboard_consumers", + "series": [ + { + "labels": {}, + "labelsArray": [ + { + "result": "All attributes are present and meet the conditions" + } + ], + "values": [] + } + ] + } + ] + } +} +``` + +``` +127.0.0.1:8080/api/v1/messaging-queues/kafka/consumer-lag/onboarding-kafka +``` + +``` +{ + "status": "success", + "data": { + "resultType": "", + "result": [ + { + "queryName": "onboard_consumers", + "series": [ + { + "labels": {}, + "labelsArray": [ + { + "result": "Neither metric (kafka_consumer_fetch_latency_avg nor kafka_consumer_group_lag) is present in the given time range." + } + ], + "values": [] + } + ] + } + ] + } +} +``` diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index 7f4266df67..196a7da081 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -180,14 +180,19 @@ func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, a func buildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, queryContext string) (*v3.ClickHouseQuery, error) { start := messagingQueue.Start end := messagingQueue.End - topic, ok := messagingQueue.Variables["topic"] - if !ok { - return nil, fmt.Errorf("invalid type for Topic") - } - partition, ok := messagingQueue.Variables["partition"] - if !ok { - return nil, fmt.Errorf("invalid type for Partition") + var topic, partition string + + if queryContext == "producer" || queryContext == "consumer" { + var ok bool + topic, ok = messagingQueue.Variables["topic"] + if !ok { + return nil, fmt.Errorf("invalid type for Topic") + } + partition, ok = messagingQueue.Variables["partition"] + if !ok { + return nil, fmt.Errorf("invalid type for Partition") + } } var query string @@ -199,6 +204,12 @@ func buildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, quer return nil, fmt.Errorf("invalid type for consumer group") } query = generateConsumerSQL(start, end, topic, partition, consumerGroup, queueType) + } else if queryContext == "onboard_producers" { + query = onboardProducersSQL(start, end, queueType) + } else if queryContext == "onboard_consumers" { + query = onboardConsumerSQL(start, end, queueType) + } else if queryContext == "onboard_kafka" { + query = onboardKafkaSQL(start, end) } return &v3.ClickHouseQuery{