From 063c9adba60d33a4522c517d89b590cd8f2c80fa Mon Sep 17 00:00:00 2001 From: shivanshu Date: Fri, 26 Jul 2024 15:23:31 +0530 Subject: [PATCH] chore: pr-reviews --- pkg/query-service/app/http_handler.go | 12 ++-- .../messagingQueues/kafka/translator.go | 66 +++++-------------- 2 files changed, 22 insertions(+), 56 deletions(-) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index b74cafabfb..729dbf77aa 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -2263,7 +2263,7 @@ func (aH *APIHandler) getProducerData( w http.ResponseWriter, r *http.Request, ) { // parse the query params to retrieve the messaging queue struct - messagingQueue, apiErr := ParseMessagingQueueParams(r) + messagingQueue, apiErr := ParseMessagingQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) @@ -2304,7 +2304,7 @@ func (aH *APIHandler) getProducerData( func (aH *APIHandler) getConsumerData( w http.ResponseWriter, r *http.Request, ) { - messagingQueue, apiErr := ParseMessagingQueueParams(r) + messagingQueue, apiErr := ParseMessagingQueueBody(r) if apiErr != nil { zap.L().Error(apiErr.Err.Error()) @@ -2342,10 +2342,10 @@ func (aH *APIHandler) getConsumerData( aH.Respond(w, resp) } -// ParseMessagingQueueParams parse for messaging queue params -func ParseMessagingQueueParams(r *http.Request) (*mq.MessagingQueue, *model.ApiError) { - var messagingQueue *mq.MessagingQueue - if err := json.NewDecoder(r.Body).Decode(&messagingQueue); err != nil { +// ParseMessagingQueueBody parse for messaging queue params +func ParseMessagingQueueBody(r *http.Request) (*mq.MessagingQueue, *model.ApiError) { + messagingQueue := new(mq.MessagingQueue) + if err := json.NewDecoder(r.Body).Decode(messagingQueue); err != nil { return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)} } return messagingQueue, nil diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index 5cb67d7bf0..99760d7fbb 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -14,27 +14,15 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) queueType := kafkaQueue var cq *v3.CompositeQuery - if queryContext == "producer" { - chq, err := buildProducerClickHouseQuery(messagingQueue, queueType) - if err != nil { - return nil, err - } - cq, err = buildCompositeQueryProducer(chq) - if err != nil { - return nil, err - } - } else if queryContext == "consumer" { - chq, err := buildConsumerClickHouseQuery(messagingQueue, queueType) - if err != nil { - return nil, err - } - cq, err = buildCompositeQueryConsumer(chq) - if err != nil { - return nil, err - } + chq, err := buildClickHouseQuery(messagingQueue, queueType, queryContext) + + if err != nil { + return nil, err } + cq, err = buildCompositeQuery(chq, queryContext) + queryRangeParams := &v3.QueryRangeParamsV3{ Start: messagingQueue.Start, End: messagingQueue.End, @@ -47,7 +35,7 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) return queryRangeParams, nil } -func buildProducerClickHouseQuery(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) { +func buildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, queryContext string) (*v3.ClickHouseQuery, error) { start := messagingQueue.Start end := messagingQueue.End topic, ok := messagingQueue.Variables["topic"] @@ -60,45 +48,23 @@ func buildProducerClickHouseQuery(messagingQueue *MessagingQueue, queueType stri if !ok { return nil, fmt.Errorf("invalid type for Partition") } - query := generateProducerSQL(start, end, topic, partition, queueType) + + var query string + if queryContext == "producer" { + query = generateProducerSQL(start, end, topic, partition, queueType) + } else if queryContext == "consumer" { + query = generateConsumerSQL(start, end, topic, partition, queueType) + } return &v3.ClickHouseQuery{ Query: query, }, nil } -func buildConsumerClickHouseQuery(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) { - start := messagingQueue.Start - end := messagingQueue.End - topic, ok := messagingQueue.Variables["topic"] - if !ok { - return nil, fmt.Errorf("invalid type for Topic") - } - - partition, ok := messagingQueue.Variables["partition"] - - if !ok { - return nil, fmt.Errorf("invalid type for Partition") - } - query := generateConsumerSQL(start, end, topic, partition, queueType) - - return &v3.ClickHouseQuery{ - Query: query, - }, nil -} - -func buildCompositeQueryProducer(chq *v3.ClickHouseQuery) (*v3.CompositeQuery, error) { +func buildCompositeQuery(chq *v3.ClickHouseQuery, queryContext string) (*v3.CompositeQuery, error) { return &v3.CompositeQuery{ QueryType: v3.QueryTypeClickHouseSQL, - ClickHouseQueries: map[string]*v3.ClickHouseQuery{"producer": chq}, - PanelType: v3.PanelTypeTable, - }, nil -} - -func buildCompositeQueryConsumer(chq *v3.ClickHouseQuery) (*v3.CompositeQuery, error) { - return &v3.CompositeQuery{ - QueryType: v3.QueryTypeClickHouseSQL, - ClickHouseQueries: map[string]*v3.ClickHouseQuery{"consumer": chq}, + ClickHouseQueries: map[string]*v3.ClickHouseQuery{queryContext: chq}, PanelType: v3.PanelTypeTable, }, nil }