chore: pr-reviews

This commit is contained in:
shivanshu 2024-07-26 15:23:31 +05:30
parent 5c3ce146fa
commit 063c9adba6
No known key found for this signature in database
GPG Key ID: 0F9ACBC3AA12DC71
2 changed files with 22 additions and 56 deletions

View File

@ -2263,7 +2263,7 @@ func (aH *APIHandler) getProducerData(
w http.ResponseWriter, r *http.Request, w http.ResponseWriter, r *http.Request,
) { ) {
// parse the query params to retrieve the messaging queue struct // parse the query params to retrieve the messaging queue struct
messagingQueue, apiErr := ParseMessagingQueueParams(r) messagingQueue, apiErr := ParseMessagingQueueBody(r)
if apiErr != nil { if apiErr != nil {
zap.L().Error(apiErr.Err.Error()) zap.L().Error(apiErr.Err.Error())
@ -2304,7 +2304,7 @@ func (aH *APIHandler) getProducerData(
func (aH *APIHandler) getConsumerData( func (aH *APIHandler) getConsumerData(
w http.ResponseWriter, r *http.Request, w http.ResponseWriter, r *http.Request,
) { ) {
messagingQueue, apiErr := ParseMessagingQueueParams(r) messagingQueue, apiErr := ParseMessagingQueueBody(r)
if apiErr != nil { if apiErr != nil {
zap.L().Error(apiErr.Err.Error()) zap.L().Error(apiErr.Err.Error())
@ -2342,10 +2342,10 @@ func (aH *APIHandler) getConsumerData(
aH.Respond(w, resp) aH.Respond(w, resp)
} }
// ParseMessagingQueueParams parse for messaging queue params // ParseMessagingQueueBody parse for messaging queue params
func ParseMessagingQueueParams(r *http.Request) (*mq.MessagingQueue, *model.ApiError) { func ParseMessagingQueueBody(r *http.Request) (*mq.MessagingQueue, *model.ApiError) {
var messagingQueue *mq.MessagingQueue messagingQueue := new(mq.MessagingQueue)
if err := json.NewDecoder(r.Body).Decode(&messagingQueue); err != nil { if err := json.NewDecoder(r.Body).Decode(messagingQueue); err != nil {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)} return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)}
} }
return messagingQueue, nil return messagingQueue, nil

View File

@ -14,26 +14,14 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string)
queueType := kafkaQueue queueType := kafkaQueue
var cq *v3.CompositeQuery var cq *v3.CompositeQuery
if queryContext == "producer" {
chq, err := buildProducerClickHouseQuery(messagingQueue, queueType) chq, err := buildClickHouseQuery(messagingQueue, queueType, queryContext)
if err != nil {
return nil, err
}
cq, err = buildCompositeQueryProducer(chq)
if err != nil { if err != nil {
return nil, err return nil, err
} }
} else if queryContext == "consumer" {
chq, err := buildConsumerClickHouseQuery(messagingQueue, queueType) cq, err = buildCompositeQuery(chq, queryContext)
if err != nil {
return nil, err
}
cq, err = buildCompositeQueryConsumer(chq)
if err != nil {
return nil, err
}
}
queryRangeParams := &v3.QueryRangeParamsV3{ queryRangeParams := &v3.QueryRangeParamsV3{
Start: messagingQueue.Start, Start: messagingQueue.Start,
@ -47,7 +35,7 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string)
return queryRangeParams, nil return queryRangeParams, nil
} }
func buildProducerClickHouseQuery(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) { func buildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, queryContext string) (*v3.ClickHouseQuery, error) {
start := messagingQueue.Start start := messagingQueue.Start
end := messagingQueue.End end := messagingQueue.End
topic, ok := messagingQueue.Variables["topic"] topic, ok := messagingQueue.Variables["topic"]
@ -60,45 +48,23 @@ func buildProducerClickHouseQuery(messagingQueue *MessagingQueue, queueType stri
if !ok { if !ok {
return nil, fmt.Errorf("invalid type for Partition") return nil, fmt.Errorf("invalid type for Partition")
} }
query := generateProducerSQL(start, end, topic, partition, queueType)
var query string
if queryContext == "producer" {
query = generateProducerSQL(start, end, topic, partition, queueType)
} else if queryContext == "consumer" {
query = generateConsumerSQL(start, end, topic, partition, queueType)
}
return &v3.ClickHouseQuery{ return &v3.ClickHouseQuery{
Query: query, Query: query,
}, nil }, nil
} }
func buildConsumerClickHouseQuery(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) { func buildCompositeQuery(chq *v3.ClickHouseQuery, queryContext string) (*v3.CompositeQuery, error) {
start := messagingQueue.Start
end := messagingQueue.End
topic, ok := messagingQueue.Variables["topic"]
if !ok {
return nil, fmt.Errorf("invalid type for Topic")
}
partition, ok := messagingQueue.Variables["partition"]
if !ok {
return nil, fmt.Errorf("invalid type for Partition")
}
query := generateConsumerSQL(start, end, topic, partition, queueType)
return &v3.ClickHouseQuery{
Query: query,
}, nil
}
func buildCompositeQueryProducer(chq *v3.ClickHouseQuery) (*v3.CompositeQuery, error) {
return &v3.CompositeQuery{ return &v3.CompositeQuery{
QueryType: v3.QueryTypeClickHouseSQL, QueryType: v3.QueryTypeClickHouseSQL,
ClickHouseQueries: map[string]*v3.ClickHouseQuery{"producer": chq}, ClickHouseQueries: map[string]*v3.ClickHouseQuery{queryContext: chq},
PanelType: v3.PanelTypeTable,
}, nil
}
func buildCompositeQueryConsumer(chq *v3.ClickHouseQuery) (*v3.CompositeQuery, error) {
return &v3.CompositeQuery{
QueryType: v3.QueryTypeClickHouseSQL,
ClickHouseQueries: map[string]*v3.ClickHouseQuery{"consumer": chq},
PanelType: v3.PanelTypeTable, PanelType: v3.PanelTypeTable,
}, nil }, nil
} }