feat: onboarding APIs

This commit is contained in:
shivanshu 2024-09-30 18:13:39 +05:30
parent f38a1d9f1c
commit ea0263cc73
No known key found for this signature in database
GPG Key ID: 0F9ACBC3AA12DC71
3 changed files with 346 additions and 7 deletions

View File

@ -2469,6 +2469,9 @@ func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *Auth
kafkaSubRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost)
kafkaSubRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost)
kafkaSubRouter.HandleFunc("/network-latency", am.ViewAccess(aH.getNetworkData)).Methods(http.MethodPost)
kafkaSubRouter.HandleFunc("/onboarding-producers", am.ViewAccess(aH.onboardProducers)).Methods(http.MethodPost)
kafkaSubRouter.HandleFunc("/onboarding-consumers", am.ViewAccess(aH.onboardConsumers)).Methods(http.MethodPost)
kafkaSubRouter.HandleFunc("/onboarding-kafka", am.ViewAccess(aH.onboardKafka)).Methods(http.MethodPost)
// for other messaging queues, add SubRouters here
}
@ -2478,6 +2481,241 @@ func uniqueIdentifier(clientID, serviceInstanceID, serviceName, separator string
return clientID + separator + serviceInstanceID + separator + serviceName
}
func (aH *APIHandler) onboardProducers(
w http.ResponseWriter, r *http.Request,
) {
messagingQueue, apiErr := ParseMessagingQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
RespondError(w, apiErr, nil)
return
}
queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "onboard_producers")
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
result, errQuriesByName, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
return
}
transformedSeries := &v3.Series{
Labels: make(map[string]string),
LabelsArray: make([]map[string]string, 0),
Points: make([]v3.Point, 0),
}
var value string
for _, result := range result {
for _, series := range result.Series {
for _, point := range series.Points {
switch point.Value {
case 0:
value = "All attributes are present and meet the conditions"
case 1:
value = "No data available in the given time range"
case 2:
value = "messaging.system attribute is not present or not equal to kafka in your spans"
case 3:
value = "check if your producer spans has kind producer"
case 4:
value = "messaging.destination.name attribute is not present in your spans"
case 5:
value = "messaging.destination.partition.id attribute is not present in your spans"
default:
value = "Unknown problem occurred, try increasing the time"
}
}
}
}
transformedSeries.LabelsArray = append(transformedSeries.LabelsArray, map[string]string{"result": value})
for _, result := range result {
for i := range result.Series {
result.Series[i] = transformedSeries
}
}
resp := v3.QueryRangeResponse{
Result: result,
}
aH.Respond(w, resp)
}
func (aH *APIHandler) onboardConsumers(
w http.ResponseWriter, r *http.Request,
) {
messagingQueue, apiErr := ParseMessagingQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
RespondError(w, apiErr, nil)
return
}
queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "onboard_consumers")
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
result, errQuriesByName, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
return
}
transformedSeries := &v3.Series{
Labels: make(map[string]string),
LabelsArray: make([]map[string]string, 0),
Points: make([]v3.Point, 0),
}
var value string
for _, result := range result {
for _, series := range result.Series {
for _, point := range series.Points {
switch point.Value {
case 0:
value = "All attributes are present and meet the conditions"
case 1:
value = "No data available in the given time range"
case 2:
value = "msgSystem attribute is not present or not equal to 'kafka' in your spans"
case 3:
value = "kind attribute is not present or not equal to 5 in your spans"
case 4:
value = "serviceName attribute is not present in your spans"
case 5:
value = "messaging.destination.name attribute is not present in your spans"
case 6:
value = "messaging.destination.partition.id attribute is not present in your spans"
case 7:
value = "messaging.kafka.consumer.group attribute is not present in your spans"
case 8:
value = "messaging.message.body.size attribute is not present in your spans"
case 9:
value = "messaging.client_id attribute is not present in your spans"
case 10:
value = "service.instance.id attribute is not present in your spans"
default:
value = "Unknown problem occurred, try increasing the time"
}
}
}
}
transformedSeries.LabelsArray = append(transformedSeries.LabelsArray, map[string]string{"result": value})
for _, result := range result {
for i := range result.Series {
result.Series[i] = transformedSeries
}
}
resp := v3.QueryRangeResponse{
Result: result,
}
aH.Respond(w, resp)
}
func (aH *APIHandler) onboardKafka(
w http.ResponseWriter, r *http.Request,
) {
messagingQueue, apiErr := ParseMessagingQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
RespondError(w, apiErr, nil)
return
}
queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "onboard_consumers")
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
result, errQuriesByName, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
return
}
transformedSeries := &v3.Series{
Labels: make(map[string]string),
LabelsArray: make([]map[string]string, 0),
Points: make([]v3.Point, 0),
}
var value string
for _, result := range result {
for _, series := range result.Series {
for _, point := range series.Points {
switch point.Value {
case 0:
value = "All required metrics are present and meet the conditions"
case 1:
value = "Neither metric (kafka_consumer_fetch_latency_avg nor kafka_consumer_group_lag) is present in the given time range."
case 2:
value = "Metric kafka_consumer_fetch_latency_avg is not present in the given time range."
default:
value = "Unknown problem occurred, try increasing the time"
}
}
}
}
transformedSeries.LabelsArray = append(transformedSeries.LabelsArray, map[string]string{"result": value})
for _, result := range result {
for i := range result.Series {
result.Series[i] = transformedSeries
}
}
resp := v3.QueryRangeResponse{
Result: result,
}
aH.Respond(w, resp)
}
func (aH *APIHandler) getNetworkData(
w http.ResponseWriter, r *http.Request,
) {

View File

@ -226,3 +226,93 @@ Response in query range `table` format
}
}
```
### Onboarding APIs
```
/api/v1/messaging-queues/kafka/consumer-lag/onboarding-producers
```
```
{
"status": "success",
"data": {
"resultType": "",
"result": [
{
"queryName": "onboard_producers",
"series": [
{
"labels": {},
"labelsArray": [
{
"result": "All attributes are present and meet the conditions"
}
],
"values": []
}
]
}
]
}
}
```
```
/api/v1/messaging-queues/kafka/consumer-lag/onboarding-consumers
```
```
{
"status": "success",
"data": {
"resultType": "",
"result": [
{
"queryName": "onboard_consumers",
"series": [
{
"labels": {},
"labelsArray": [
{
"result": "All attributes are present and meet the conditions"
}
],
"values": []
}
]
}
]
}
}
```
```
127.0.0.1:8080/api/v1/messaging-queues/kafka/consumer-lag/onboarding-kafka
```
```
{
"status": "success",
"data": {
"resultType": "",
"result": [
{
"queryName": "onboard_consumers",
"series": [
{
"labels": {},
"labelsArray": [
{
"result": "Neither metric (kafka_consumer_fetch_latency_avg nor kafka_consumer_group_lag) is present in the given time range."
}
],
"values": []
}
]
}
]
}
}
```

View File

@ -180,14 +180,19 @@ func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, a
func buildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, queryContext string) (*v3.ClickHouseQuery, error) {
start := messagingQueue.Start
end := messagingQueue.End
topic, ok := messagingQueue.Variables["topic"]
if !ok {
return nil, fmt.Errorf("invalid type for Topic")
}
partition, ok := messagingQueue.Variables["partition"]
if !ok {
return nil, fmt.Errorf("invalid type for Partition")
var topic, partition string
if queryContext == "producer" || queryContext == "consumer" {
var ok bool
topic, ok = messagingQueue.Variables["topic"]
if !ok {
return nil, fmt.Errorf("invalid type for Topic")
}
partition, ok = messagingQueue.Variables["partition"]
if !ok {
return nil, fmt.Errorf("invalid type for Partition")
}
}
var query string
@ -199,6 +204,12 @@ func buildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, quer
return nil, fmt.Errorf("invalid type for consumer group")
}
query = generateConsumerSQL(start, end, topic, partition, consumerGroup, queueType)
} else if queryContext == "onboard_producers" {
query = onboardProducersSQL(start, end, queueType)
} else if queryContext == "onboard_consumers" {
query = onboardConsumerSQL(start, end, queueType)
} else if queryContext == "onboard_kafka" {
query = onboardKafkaSQL(start, end)
}
return &v3.ClickHouseQuery{