feat: change to builder queries

This commit is contained in:
shivanshu 2024-08-26 17:56:03 +05:30
parent 5cdcbef00c
commit 666f601ecd
No known key found for this signature in database
GPG Key ID: 0F9ACBC3AA12DC71
5 changed files with 115 additions and 33 deletions

View File

@ -2540,10 +2540,11 @@ func (aH *APIHandler) getNetworkData(
for _, res := range result { for _, res := range result {
table := res.Table table := res.Table
for _, row := range table.Rows { for _, row := range table.Rows {
if row.Data["consumer_id"] != nil && row.Data["serviceName"] != nil { if row.Data["client_id"] != nil && row.Data["service_instance_id"] != nil && row.Data["service_name"] != nil {
consumerId := row.Data["consumer_id"].(string) clientID := row.Data["client_id"].(string)
serviceName := row.Data["serviceName"].(string) serviceInstanceId := row.Data["service_instance_id"].(string)
attributeCache = append(attributeCache, mq.Clients{ConsumerId: consumerId, ServiceName: serviceName}) ServiceName := row.Data["service_name"].(string)
attributeCache = append(attributeCache, mq.Clients{ClientID: clientID, ServiceInstanceID: serviceInstanceId, ServiceName: ServiceName})
} }
} }
} }

View File

@ -225,7 +225,7 @@ response in query range format `series`
"table": { "table": {
"columns": [ "columns": [
{ {
"name": "consumer_id", "name": "client_id",
"queryName": "", "queryName": "",
"isValueColumn": false "isValueColumn": false
}, },
@ -248,7 +248,7 @@ response in query range format `series`
"rows": [ "rows": [
{ {
"data": { "data": {
"consumer_id": "consumer-cg1-1", "client_id": "consumer-cg1-1",
"instance_id": "e33ffd7c-827a-427a-828e-547e00cb80d8", "instance_id": "e33ffd7c-827a-427a-828e-547e00cb80d8",
"serviceName": "consumer-svc", "serviceName": "consumer-svc",
"throughput": 0.00035 "throughput": 0.00035
@ -256,7 +256,7 @@ response in query range format `series`
}, },
{ {
"data": { "data": {
"consumer_id": "consumer-cg1-1", "client_id": "consumer-cg1-1",
"instance_id": "a96ff029-6f14-435a-a3d4-ab4742b4347f", "instance_id": "a96ff029-6f14-435a-a3d4-ab4742b4347f",
"serviceName": "consumer-svc", "serviceName": "consumer-svc",
"throughput": 0.00027 "throughput": 0.00027
@ -264,7 +264,7 @@ response in query range format `series`
}, },
{ {
"data": { "data": {
"consumer_id": "consumer-cg1-1", "client_id": "consumer-cg1-1",
"instance_id": "ac4833a8-fbe1-4592-a0ff-241f46a0851d", "instance_id": "ac4833a8-fbe1-4592-a0ff-241f46a0851d",
"serviceName": "consumer-svc-2", "serviceName": "consumer-svc-2",
"throughput": 0.00019 "throughput": 0.00019
@ -272,7 +272,7 @@ response in query range format `series`
}, },
{ {
"data": { "data": {
"consumer_id": "consumer-cg1-1", "client_id": "consumer-cg1-1",
"instance_id": "9e87227f-a564-4b55-bf7c-fb00365d9400", "instance_id": "9e87227f-a564-4b55-bf7c-fb00365d9400",
"serviceName": "consumer-svc", "serviceName": "consumer-svc",
"throughput": 0.00008 "throughput": 0.00008

View File

@ -9,6 +9,7 @@ type MessagingQueue struct {
} }
type Clients struct { type Clients struct {
ConsumerId string ClientID string
ServiceName string ServiceInstanceID string
ServiceName string
} }

View File

@ -79,9 +79,9 @@ func generateNetworkLatencyThroughputSQL(start, end int64, consumerGroup, queueT
query := fmt.Sprintf(` query := fmt.Sprintf(`
--- Subquery for RPS calculation, desc sorted by rps --- Subquery for RPS calculation, desc sorted by rps
SELECT SELECT
stringTagMap['messaging.client_id'] AS consumer_id, stringTagMap['messaging.client_id'] AS client_id,
stringTagMap['service.instance.id'] AS instance_id, stringTagMap['service.instance.id'] AS service_instance_id,
serviceName, serviceName AS service_name,
count(*) / ((%d - %d) / 1000000000) AS rps -- Convert nanoseconds to seconds count(*) / ((%d - %d) / 1000000000) AS rps -- Convert nanoseconds to seconds
FROM signoz_traces.signoz_index_v2 FROM signoz_traces.signoz_index_v2
WHERE WHERE
@ -90,7 +90,7 @@ WHERE
AND kind = 5 AND kind = 5
AND msgSystem = '%s' AND msgSystem = '%s'
AND stringTagMap['messaging.kafka.consumer.group'] = '%s' AND stringTagMap['messaging.kafka.consumer.group'] = '%s'
GROUP BY serviceName, consumer_id, instance_id GROUP BY service_name, client_id, service_instance_id
ORDER BY rps DESC ORDER BY rps DESC
`, end, start, start, end, queueType, consumerGroup) `, end, start, start, end, queueType, consumerGroup)
return query return query

View File

@ -1,6 +1,7 @@
package kafka package kafka
import ( import (
"encoding/json"
"fmt" "fmt"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3" v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
) )
@ -49,26 +50,98 @@ func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType strin
}, nil }, nil
} }
func buildClickHouseQueriesNetwork(messagingQueue *MessagingQueue, attributeCache []Clients) (map[string]*v3.ClickHouseQuery, error) { //func buildClickHouseQueriesNetwork(messagingQueue *MessagingQueue, attributeCache []Clients) (map[string]*v3.ClickHouseQuery, error) {
cq := make(map[string]*v3.ClickHouseQuery) // cq := make(map[string]*v3.ClickHouseQuery)
start := messagingQueue.Start // start := messagingQueue.Start
end := messagingQueue.End // end := messagingQueue.End
//
// for i, clientInfo := range attributeCache {
// query := generateNetworkLatencyFetchSQL(defaultStepInterval, start/1000000, end/1000000, clientInfo.ConsumerId, clientInfo.ServiceName)
// chq := &v3.ClickHouseQuery{
// Query: query,
// }
// index := fmt.Sprintf("latency_%d", i)
// cq[index] = chq
// }
//
// return cq, nil
//}
for i, clientInfo := range attributeCache { func buildBuilderQueriesNetwork(attributeCache []Clients) (map[string]*v3.BuilderQuery, error) {
query := generateNetworkLatencyFetchSQL(defaultStepInterval, start/1000000, end/1000000, clientInfo.ConsumerId, clientInfo.ServiceName) bq := make(map[string]*v3.BuilderQuery)
chq := &v3.ClickHouseQuery{
Query: query, for i, instanceInfo := range attributeCache {
queryName := fmt.Sprintf("latency_%d", i)
chq := &v3.BuilderQuery{
QueryName: queryName,
StepInterval: defaultStepInterval,
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "kafka_consumer_fetch_latency_avg",
},
AggregateOperator: v3.AggregateOperatorAvg,
Temporality: v3.Unspecified,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationAvg,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "service_name",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
},
Operator: v3.FilterOperatorEqual,
Value: instanceInfo.ServiceName,
},
{
Key: v3.AttributeKey{
Key: "client_id",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
},
Operator: v3.FilterOperatorEqual,
Value: instanceInfo.ClientID,
},
{
Key: v3.AttributeKey{
Key: "service_instance_id",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
},
Operator: v3.FilterOperatorEqual,
Value: instanceInfo.ServiceInstanceID,
},
},
},
Expression: queryName,
ReduceTo: v3.ReduceToOperatorAvg,
GroupBy: []v3.AttributeKey{{
Key: "service_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
{
Key: "client_id",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
{
Key: "service_instance_id",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
},
} }
index := fmt.Sprintf("latency_%d", i) bq[queryName] = chq
cq[index] = chq
} }
return cq, nil return bq, nil
} }
func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, attributeCache []Clients) (*v3.QueryRangeParamsV3, error) { func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, attributeCache []Clients) (*v3.QueryRangeParamsV3, error) {
// ToDo: propagate this through APIs when there are different handlers
queueType := kafkaQueue queueType := kafkaQueue
var cq *v3.CompositeQuery var cq *v3.CompositeQuery
@ -81,26 +154,33 @@ func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, a
} }
cq, err = buildCompositeQuery(chq, queryContext) cq, err = buildCompositeQuery(chq, queryContext)
} else if queryContext == "fetch-latency" { } else if queryContext == "fetch-latency" {
chq, err := buildClickHouseQueriesNetwork(messagingQueue, attributeCache) bhq, err := buildBuilderQueriesNetwork(attributeCache)
if err != nil { if err != nil {
return nil, err return nil, err
} }
cq = &v3.CompositeQuery{ cq = &v3.CompositeQuery{
QueryType: v3.QueryTypeClickHouseSQL, QueryType: v3.QueryTypeBuilder,
ClickHouseQueries: chq, BuilderQueries: bhq,
PanelType: v3.PanelTypeTable, PanelType: v3.PanelTypeTable,
} }
} }
unixMiliStart := messagingQueue.Start / 1000000
unixMiliEnd := messagingQueue.End / 1000000
queryRangeParams := &v3.QueryRangeParamsV3{ queryRangeParams := &v3.QueryRangeParamsV3{
Start: messagingQueue.Start, Start: unixMiliStart,
End: messagingQueue.End, End: unixMiliEnd,
Step: defaultStepInterval, Step: defaultStepInterval,
CompositeQuery: cq, CompositeQuery: cq,
Version: "v4", Version: "v4",
FormatForWeb: true, FormatForWeb: true,
} }
tmp, _ := json.Marshal(queryRangeParams)
xx := string(tmp)
fmt.Print(xx)
return queryRangeParams, nil return queryRangeParams, nil
} }