feat: add partition level granularity

This commit is contained in:
shivanshu 2024-08-27 18:27:44 +05:30
parent 8d655bf419
commit aabf364cc6
No known key found for this signature in database
GPG Key ID: 0F9ACBC3AA12DC71
5 changed files with 121 additions and 88 deletions

View File

@ -2501,9 +2501,17 @@ func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *Auth
// for other messaging queues, add SubRouters here // for other messaging queues, add SubRouters here
} }
// not using md5 hashing as the plain string would work
func uniqueIdentifier(clientID, serviceInstanceID, serviceName, separator string) string {
return clientID + separator + serviceInstanceID + separator + serviceName
}
func (aH *APIHandler) getNetworkData( func (aH *APIHandler) getNetworkData(
w http.ResponseWriter, r *http.Request, w http.ResponseWriter, r *http.Request,
) { ) {
attributeCache := &mq.Clients{
Hash: make(map[string]struct{}),
}
messagingQueue, apiErr := ParseMessagingQueueBody(r) messagingQueue, apiErr := ParseMessagingQueueBody(r)
if apiErr != nil { if apiErr != nil {
@ -2512,7 +2520,6 @@ func (aH *APIHandler) getNetworkData(
return return
} }
attributeCache := make([]mq.Clients, 0)
queryRangeParams, err := mq.BuildQRParamsNetwork(messagingQueue, "throughput", attributeCache) queryRangeParams, err := mq.BuildQRParamsNetwork(messagingQueue, "throughput", attributeCache)
if err != nil { if err != nil {
zap.L().Error(err.Error()) zap.L().Error(err.Error())
@ -2526,12 +2533,12 @@ func (aH *APIHandler) getNetworkData(
} }
var result []*v3.Result var result []*v3.Result
var errQuriesByName map[string]error var errQueriesByName map[string]error
result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil) result, errQueriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil)
if err != nil { if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName) RespondError(w, apiErrObj, errQueriesByName)
return return
} }
@ -2540,10 +2547,13 @@ func (aH *APIHandler) getNetworkData(
clientID, clientIDOk := series.Labels["client_id"] clientID, clientIDOk := series.Labels["client_id"]
serviceInstanceID, serviceInstanceIDOk := series.Labels["service_instance_id"] serviceInstanceID, serviceInstanceIDOk := series.Labels["service_instance_id"]
serviceName, serviceNameOk := series.Labels["service_name"] serviceName, serviceNameOk := series.Labels["service_name"]
if clientIDOk && serviceInstanceIDOk && serviceNameOk { hashKey := uniqueIdentifier(clientID, serviceInstanceID, serviceName, "#")
attributeCache = append(attributeCache, mq.Clients{ClientID: clientID, _, ok := attributeCache.Hash[hashKey]
ServiceInstanceID: serviceInstanceID, if clientIDOk && serviceInstanceIDOk && serviceNameOk && !ok {
ServiceName: serviceName}) attributeCache.Hash[hashKey] = struct{}{}
attributeCache.ClientID = append(attributeCache.ClientID, clientID)
attributeCache.ServiceInstanceID = append(attributeCache.ServiceInstanceID, serviceInstanceID)
attributeCache.ServiceName = append(attributeCache.ServiceName, serviceName)
} }
} }
} }
@ -2560,22 +2570,30 @@ func (aH *APIHandler) getNetworkData(
return return
} }
resultFetchLatency, errQuriesByNameFetchLatency, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil) resultFetchLatency, errQueriesByNameFetchLatency, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil)
if err != nil { if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByNameFetchLatency) RespondError(w, apiErrObj, errQueriesByNameFetchLatency)
return return
} }
latencyColoumn := &v3.Result{QueryName: "latency"} latencyColumn := &v3.Result{QueryName: "latency"}
var latencySeries []*v3.Series var latencySeries []*v3.Series
for _, res := range resultFetchLatency { for _, res := range resultFetchLatency {
for _, series := range res.Series { for _, series := range res.Series {
latencySeries = append(latencySeries, series) clientID, clientIDOk := series.Labels["client_id"]
serviceInstanceID, serviceInstanceIDOk := series.Labels["service_instance_id"]
serviceName, serviceNameOk := series.Labels["service_name"]
hashKey := uniqueIdentifier(clientID, serviceInstanceID, serviceName, "#")
_, ok := attributeCache.Hash[hashKey]
if clientIDOk && serviceInstanceIDOk && serviceNameOk && ok {
latencySeries = append(latencySeries, series)
}
} }
} }
latencyColoumn.Series = latencySeries
result = append(result, latencyColoumn) latencyColumn.Series = latencySeries
result = append(result, latencyColumn)
resultFetchLatency = postprocess.TransformToTableForBuilderQueries(result, queryRangeParams) resultFetchLatency = postprocess.TransformToTableForBuilderQueries(result, queryRangeParams)

View File

@ -158,7 +158,8 @@ Request-Body
"start": 1724673937000000000, "start": 1724673937000000000,
"end": 1724675737000000000, "end": 1724675737000000000,
"variables": { "variables": {
"consumer_group": "cg1" "consumer_group": "cg1",
"partition": "0"
} }
} }
``` ```
@ -203,19 +204,19 @@ Response in query range `table` format
{ {
"data": { "data": {
"client_id": "consumer-cg1-1", "client_id": "consumer-cg1-1",
"latency": 25.21, "latency": 48.99,
"service_instance_id": "ccf49550-2e8f-4c7b-be29-b9e0891ef93d", "service_instance_id": "b0a851d7-1735-4e3f-8f5f-7c63a8a55a24",
"service_name": "consumer-svc", "service_name": "consumer-svc",
"throughput": 24.91 "throughput": 14.97
} }
}, },
{ {
"data": { "data": {
"client_id": "consumer-cg1-1", "client_id": "consumer-cg1-1",
"latency": 49.68, "latency": 25.21,
"service_instance_id": "b0a851d7-1735-4e3f-8f5f-7c63a8a55a24", "service_instance_id": "ccf49550-2e8f-4c7b-be29-b9e0891ef93d",
"service_name": "consumer-svc", "service_name": "consumer-svc",
"throughput": 14.97 "throughput": 24.91
} }
} }
] ]

View File

@ -9,7 +9,8 @@ type MessagingQueue struct {
} }
type Clients struct { type Clients struct {
ClientID string Hash map[string]struct{}
ServiceInstanceID string ClientID []string
ServiceName string ServiceInstanceID []string
ServiceName []string
} }

View File

@ -74,14 +74,14 @@ ORDER BY
return query return query
} }
func generateNetworkLatencyThroughputSQL(start, end int64, consumerGroup, queueType string) string { func generateNetworkLatencyThroughputSQL(start, end int64, consumerGroup, partitionID, queueType string) string {
timeRange := (end - start) / 1000000000 timeRange := (end - start) / 1000000000
query := fmt.Sprintf(` query := fmt.Sprintf(`
SELECT SELECT
stringTagMap['messaging.client_id'] AS client_id, stringTagMap['messaging.client_id'] AS client_id,
stringTagMap['service.instance.id'] AS service_instance_id, stringTagMap['service.instance.id'] AS service_instance_id,
serviceName AS service_name, serviceName AS service_name,
count(*) / %d AS throughput -- Convert nanoseconds to seconds count(*) / %d AS throughput
FROM signoz_traces.distributed_signoz_index_v2 FROM signoz_traces.distributed_signoz_index_v2
WHERE WHERE
timestamp >= '%d' timestamp >= '%d'
@ -89,8 +89,9 @@ WHERE
AND kind = 5 AND kind = 5
AND msgSystem = '%s' AND msgSystem = '%s'
AND stringTagMap['messaging.kafka.consumer.group'] = '%s' AND stringTagMap['messaging.kafka.consumer.group'] = '%s'
AND stringTagMap['messaging.destination.partition.id'] = '%s'
GROUP BY service_name, client_id, service_instance_id GROUP BY service_name, client_id, service_instance_id
ORDER BY throughput DESC ORDER BY throughput DESC
`, timeRange, start, end, queueType, consumerGroup) `, timeRange, start, end, queueType, consumerGroup, partitionID)
return query return query
} }

View File

@ -2,6 +2,8 @@ package kafka
import ( import (
"fmt" "fmt"
"strings"
"go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/common"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
) )
@ -43,87 +45,97 @@ func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType strin
return nil, fmt.Errorf("consumer_group not found in the request") return nil, fmt.Errorf("consumer_group not found in the request")
} }
query := generateNetworkLatencyThroughputSQL(start, end, consumerGroup, queueType) partitionID, ok := messagingQueue.Variables["partition"]
if !ok {
return nil, fmt.Errorf("partition not found in the request")
}
query := generateNetworkLatencyThroughputSQL(start, end, consumerGroup, partitionID, queueType)
return &v3.ClickHouseQuery{ return &v3.ClickHouseQuery{
Query: query, Query: query,
}, nil }, nil
} }
func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCache []Clients) (map[string]*v3.BuilderQuery, error) { func formatstring(str []string) string {
bq := make(map[string]*v3.BuilderQuery) joined := strings.Join(str, ", ")
if len(joined) <= 2 {
return ""
}
return joined[1 : len(joined)-1]
}
for i, instanceInfo := range attributeCache { func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCache *Clients) (map[string]*v3.BuilderQuery, error) {
queryName := fmt.Sprintf("latency_%d", i) bq := make(map[string]*v3.BuilderQuery)
chq := &v3.BuilderQuery{ queryName := fmt.Sprintf("latency")
QueryName: queryName,
StepInterval: common.MinAllowedStepInterval(unixMilliStart, unixMilliEnd), chq := &v3.BuilderQuery{
DataSource: v3.DataSourceMetrics, QueryName: queryName,
AggregateAttribute: v3.AttributeKey{ StepInterval: common.MinAllowedStepInterval(unixMilliStart, unixMilliEnd),
Key: "kafka_consumer_fetch_latency_avg", DataSource: v3.DataSourceMetrics,
}, AggregateAttribute: v3.AttributeKey{
AggregateOperator: v3.AggregateOperatorAvg, Key: "kafka_consumer_fetch_latency_avg",
Temporality: v3.Unspecified, },
TimeAggregation: v3.TimeAggregationAvg, AggregateOperator: v3.AggregateOperatorAvg,
SpaceAggregation: v3.SpaceAggregationAvg, Temporality: v3.Unspecified,
Filters: &v3.FilterSet{ TimeAggregation: v3.TimeAggregationAvg,
Operator: "AND", SpaceAggregation: v3.SpaceAggregationAvg,
Items: []v3.FilterItem{ Filters: &v3.FilterSet{
{ Operator: "AND",
Key: v3.AttributeKey{ Items: []v3.FilterItem{
Key: "service_name", {
Type: v3.AttributeKeyTypeTag, Key: v3.AttributeKey{
DataType: v3.AttributeKeyDataTypeString, Key: "service_name",
}, Type: v3.AttributeKeyTypeTag,
Operator: v3.FilterOperatorEqual, DataType: v3.AttributeKeyDataTypeString,
Value: instanceInfo.ServiceName,
}, },
{ Operator: v3.FilterOperatorIn,
Key: v3.AttributeKey{ Value: attributeCache.ServiceName,
Key: "client_id", },
Type: v3.AttributeKeyTypeTag, {
DataType: v3.AttributeKeyDataTypeString, Key: v3.AttributeKey{
}, Key: "client_id",
Operator: v3.FilterOperatorEqual, Type: v3.AttributeKeyTypeTag,
Value: instanceInfo.ClientID, DataType: v3.AttributeKeyDataTypeString,
}, },
{ Operator: v3.FilterOperatorIn,
Key: v3.AttributeKey{ Value: attributeCache.ClientID,
Key: "service_instance_id", },
Type: v3.AttributeKeyTypeTag, {
DataType: v3.AttributeKeyDataTypeString, Key: v3.AttributeKey{
}, Key: "service_instance_id",
Operator: v3.FilterOperatorEqual, Type: v3.AttributeKeyTypeTag,
Value: instanceInfo.ServiceInstanceID, DataType: v3.AttributeKeyDataTypeString,
}, },
Operator: v3.FilterOperatorIn,
Value: attributeCache.ServiceInstanceID,
}, },
}, },
Expression: queryName, },
ReduceTo: v3.ReduceToOperatorAvg, Expression: queryName,
GroupBy: []v3.AttributeKey{{ ReduceTo: v3.ReduceToOperatorAvg,
Key: "service_name", GroupBy: []v3.AttributeKey{{
Key: "service_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
{
Key: "client_id",
DataType: v3.AttributeKeyDataTypeString, DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag, Type: v3.AttributeKeyTypeTag,
}, },
{ {
Key: "client_id", Key: "service_instance_id",
DataType: v3.AttributeKeyDataTypeString, DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag, Type: v3.AttributeKeyTypeTag,
},
{
Key: "service_instance_id",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
}, },
} },
bq[queryName] = chq
} }
bq[queryName] = chq
return bq, nil return bq, nil
} }
func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, attributeCache []Clients) (*v3.QueryRangeParamsV3, error) { func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, attributeCache *Clients) (*v3.QueryRangeParamsV3, error) {
queueType := kafkaQueue queueType := kafkaQueue