diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 5031cf7123..3e25ab23c8 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -3141,14 +3141,14 @@ func (aH *APIHandler) getProducerThroughputOverview( Hash: make(map[string]struct{}), } - queryRangeParams, err := mq.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview", attributeCache) + producerQueryRangeParams, err := mq.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview", attributeCache) if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) return } - if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + if err := validateQueryRangeParamsV3(producerQueryRangeParams); err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) return @@ -3157,7 +3157,7 @@ func (aH *APIHandler) getProducerThroughputOverview( var result []*v3.Result var errQuriesByName map[string]error - result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams) + result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), producerQueryRangeParams) if err != nil { apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} RespondError(w, apiErrObj, errQuriesByName) @@ -3165,21 +3165,21 @@ func (aH *APIHandler) getProducerThroughputOverview( } for _, res := range result { - for _, list := range res.List { - serviceName, serviceNameOk := list.Data["service_name"].(*string) - topicName, topicNameOk := list.Data["topic"].(*string) - params := []string{*serviceName, *topicName} + for _, series := range res.Series { + serviceName, serviceNameOk := series.Labels["service_name"] + topicName, topicNameOk := series.Labels["topic"] + params := []string{serviceName, topicName} hashKey := uniqueIdentifier(params, "#") _, ok := attributeCache.Hash[hashKey] if topicNameOk && serviceNameOk && !ok { attributeCache.Hash[hashKey] = struct{}{} - attributeCache.TopicName = append(attributeCache.TopicName, *topicName) - attributeCache.ServiceName = append(attributeCache.ServiceName, *serviceName) + attributeCache.TopicName = append(attributeCache.TopicName, topicName) + attributeCache.ServiceName = append(attributeCache.ServiceName, serviceName) } } } - queryRangeParams, err = mq.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview-latency", attributeCache) + queryRangeParams, err := mq.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview-byte-rate", attributeCache) if err != nil { zap.L().Error(err.Error()) RespondError(w, apiErr, nil) @@ -3198,26 +3198,32 @@ func (aH *APIHandler) getProducerThroughputOverview( return } - latencyColumn := &v3.Result{QueryName: "latency"} - var latencySeries []*v3.Row + byteRateColumn := &v3.Result{QueryName: "byte_rate"} + var byteRateSeries []*v3.Series for _, res := range resultFetchLatency { - for _, list := range res.List { - topic, topicOk := list.Data["topic"].(*string) - serviceName, serviceNameOk := list.Data["service_name"].(*string) - params := []string{*serviceName, *topic} + for _, series := range res.Series { + topic, topicOk := series.Labels["topic"] + serviceName, serviceNameOk := series.Labels["service_name"] + params := []string{serviceName, topic} hashKey := uniqueIdentifier(params, "#") _, ok := attributeCache.Hash[hashKey] if topicOk && serviceNameOk && ok { - latencySeries = append(latencySeries, list) + byteRateSeries = append(byteRateSeries, series) } } } - latencyColumn.List = latencySeries - result = append(result, latencyColumn) + byteRateColumn.Series = byteRateSeries + var latencyColumnResult []*v3.Result + latencyColumnResult = append(latencyColumnResult, byteRateColumn) + resultFetchLatency = postprocess.TransformToTableForBuilderQueries(latencyColumnResult, queryRangeParams) + + result = postprocess.TransformToTableForClickHouseQueries(result) + + result = append(result, resultFetchLatency[0]) resp := v3.QueryRangeResponse{ - Result: resultFetchLatency, + Result: result, } aH.Respond(w, resp) } diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index d7dc96d470..1a60b90e56 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -61,14 +61,17 @@ func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType strin func buildBuilderQueriesProducerBytes(unixMilliStart, unixMilliEnd int64, attributeCache *Clients) (map[string]*v3.BuilderQuery, error) { bq := make(map[string]*v3.BuilderQuery) - queryName := fmt.Sprintf("latency") + queryName := fmt.Sprintf("byte_rate") chq := &v3.BuilderQuery{ QueryName: queryName, StepInterval: common.MinAllowedStepInterval(unixMilliStart, unixMilliEnd), DataSource: v3.DataSourceMetrics, AggregateAttribute: v3.AttributeKey{ - Key: "kafka_producer_byte_rate", + Key: "kafka_producer_byte_rate", + DataType: v3.AttributeKeyDataTypeFloat64, + Type: v3.AttributeKeyType("Gauge"), + IsColumn: true, }, AggregateOperator: v3.AggregateOperatorAvg, Temporality: v3.Unspecified, @@ -276,7 +279,7 @@ func BuildQRParamsWithCache(messagingQueue *MessagingQueue, queryContext string, cq, err = buildCompositeQuery(&v3.ClickHouseQuery{ Query: query, }, queryContext) - } else if queryContext == "producer-throughput-overview-latency" { + } else if queryContext == "producer-throughput-overview-byte-rate" { bhq, err := buildBuilderQueriesProducerBytes(unixMilliStart, unixMilliEnd, attributeCache) if err != nil { return nil, err @@ -284,7 +287,8 @@ func BuildQRParamsWithCache(messagingQueue *MessagingQueue, queryContext string, cq = &v3.CompositeQuery{ QueryType: v3.QueryTypeBuilder, BuilderQueries: bhq, - PanelType: v3.PanelTypeList, + PanelType: v3.PanelTypeTable, + FillGaps: false, } } @@ -315,7 +319,7 @@ func BuildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, quer if !ok { return nil, fmt.Errorf("invalid type for Topic") } - if queryContext != "consumer-throughput-details" { + if !(queryContext == "consumer-throughput-details" || queryContext == "producer-throughput-details") { partition, ok = messagingQueue.Variables["partition"] if !ok { return nil, fmt.Errorf("invalid type for Partition") @@ -364,7 +368,7 @@ func BuildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, quer func buildCompositeQuery(chq *v3.ClickHouseQuery, queryContext string) (*v3.CompositeQuery, error) { - if queryContext == "producer-consumer-eval" || queryContext == "producer-throughput-overview" { + if queryContext == "producer-consumer-eval" { return &v3.CompositeQuery{ QueryType: v3.QueryTypeClickHouseSQL, ClickHouseQueries: map[string]*v3.ClickHouseQuery{queryContext: chq},