diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go index ef7e7049ba..3de489a39e 100644 --- a/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator.go @@ -1,8 +1,8 @@ package kafka import ( - "encoding/json" "fmt" + "go.signoz.io/signoz/pkg/query-service/common" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) @@ -50,14 +50,14 @@ func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType strin }, nil } -func buildBuilderQueriesNetwork(attributeCache []Clients) (map[string]*v3.BuilderQuery, error) { +func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCache []Clients) (map[string]*v3.BuilderQuery, error) { bq := make(map[string]*v3.BuilderQuery) for i, instanceInfo := range attributeCache { queryName := fmt.Sprintf("latency_%d", i) chq := &v3.BuilderQuery{ QueryName: queryName, - StepInterval: defaultStepInterval, + StepInterval: common.MinAllowedStepInterval(unixMilliStart, unixMilliEnd), DataSource: v3.DataSourceMetrics, AggregateAttribute: v3.AttributeKey{ Key: "kafka_consumer_fetch_latency_avg", @@ -127,6 +127,9 @@ func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, a queueType := kafkaQueue + unixMilliStart := messagingQueue.Start / 1000000 + unixMilliEnd := messagingQueue.End / 1000000 + var cq *v3.CompositeQuery if queryContext == "throughput" { @@ -139,7 +142,7 @@ func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, a cq, err = buildCompositeQuery(chq, queryContext) } else if queryContext == "fetch-latency" { - bhq, err := buildBuilderQueriesNetwork(attributeCache) + bhq, err := buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd, attributeCache) if err != nil { return nil, err } @@ -150,20 +153,14 @@ func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, a } } - unixMiliStart := messagingQueue.Start / 1000000 - unixMiliEnd := messagingQueue.End / 1000000 - queryRangeParams := &v3.QueryRangeParamsV3{ - Start: unixMiliStart, - End: unixMiliEnd, + Start: unixMilliStart, + End: unixMilliEnd, Step: defaultStepInterval, CompositeQuery: cq, Version: "v4", FormatForWeb: true, } - tmp, _ := json.Marshal(queryRangeParams) - xx := string(tmp) - fmt.Print(xx) return queryRangeParams, nil }