feat: kafka Scenario 1, 3, 4 all squashed (#6144)

* feat: kafka partition level observerability features
This commit is contained in:
Shivanshu Raj Shrivastava 2024-10-17 19:44:42 +05:30 committed by GitHub
parent 337a941d0d
commit e51f4d986d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 1232 additions and 41 deletions

View File

@ -14,6 +14,8 @@ var SaasSegmentKey = GetOrDefaultEnv("SIGNOZ_SAAS_SEGMENT_KEY", "")
var FetchFeatures = GetOrDefaultEnv("FETCH_FEATURES", "false")
var ZeusFeaturesURL = GetOrDefaultEnv("ZEUS_FEATURES_URL", "ZeusFeaturesURL")
var KafkaSpanEval = GetOrDefaultEnv("KAFKA_SPAN_EVAL", "false")
func GetOrDefaultEnv(key string, fallback string) string {
v := os.Getenv(key)
if len(v) == 0 {

View File

@ -2516,22 +2516,35 @@ func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *Auth
// SubRouter for kafka
kafkaRouter := router.PathPrefix("/api/v1/messaging-queues/kafka").Subrouter()
consumerLagRouter := kafkaRouter.PathPrefix("/consumer-lag").Subrouter()
consumerLagRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost)
consumerLagRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost)
consumerLagRouter.HandleFunc("/network-latency", am.ViewAccess(aH.getNetworkData)).Methods(http.MethodPost)
onboardingRouter := kafkaRouter.PathPrefix("/onboarding").Subrouter()
onboardingRouter.HandleFunc("/producers", am.ViewAccess(aH.onboardProducers)).Methods(http.MethodPost)
onboardingRouter.HandleFunc("/consumers", am.ViewAccess(aH.onboardConsumers)).Methods(http.MethodPost)
onboardingRouter.HandleFunc("/kafka", am.ViewAccess(aH.onboardKafka)).Methods(http.MethodPost)
partitionLatency := kafkaRouter.PathPrefix("/partition-latency").Subrouter()
partitionLatency.HandleFunc("/overview", am.ViewAccess(aH.getPartitionOverviewLatencyData)).Methods(http.MethodPost)
partitionLatency.HandleFunc("/consumer", am.ViewAccess(aH.getConsumerPartitionLatencyData)).Methods(http.MethodPost)
consumerLagRouter := kafkaRouter.PathPrefix("/consumer-lag").Subrouter()
consumerLagRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost)
consumerLagRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost)
consumerLagRouter.HandleFunc("/network-latency", am.ViewAccess(aH.getNetworkData)).Methods(http.MethodPost)
topicThroughput := kafkaRouter.PathPrefix("/topic-throughput").Subrouter()
topicThroughput.HandleFunc("/producer", am.ViewAccess(aH.getProducerThroughputOverview)).Methods(http.MethodPost)
topicThroughput.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerThroughputDetails)).Methods(http.MethodPost)
topicThroughput.HandleFunc("/consumer", am.ViewAccess(aH.getConsumerThroughputOverview)).Methods(http.MethodPost)
topicThroughput.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerThroughputDetails)).Methods(http.MethodPost)
spanEvaluation := kafkaRouter.PathPrefix("/span").Subrouter()
spanEvaluation.HandleFunc("/evaluation", am.ViewAccess(aH.getProducerConsumerEval)).Methods(http.MethodPost)
// for other messaging queues, add SubRouters here
}
// not using md5 hashing as the plain string would work
func uniqueIdentifier(clientID, serviceInstanceID, serviceName, separator string) string {
return clientID + separator + serviceInstanceID + separator + serviceName
func uniqueIdentifier(params []string, separator string) string {
return strings.Join(params, separator)
}
func (aH *APIHandler) onboardProducers(
@ -2874,7 +2887,7 @@ func (aH *APIHandler) getNetworkData(
return
}
queryRangeParams, err := mq.BuildQRParamsNetwork(messagingQueue, "throughput", attributeCache)
queryRangeParams, err := mq.BuildQRParamsWithCache(messagingQueue, "throughput", attributeCache)
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
@ -2901,7 +2914,8 @@ func (aH *APIHandler) getNetworkData(
clientID, clientIDOk := series.Labels["client_id"]
serviceInstanceID, serviceInstanceIDOk := series.Labels["service_instance_id"]
serviceName, serviceNameOk := series.Labels["service_name"]
hashKey := uniqueIdentifier(clientID, serviceInstanceID, serviceName, "#")
params := []string{clientID, serviceInstanceID, serviceName}
hashKey := uniqueIdentifier(params, "#")
_, ok := attributeCache.Hash[hashKey]
if clientIDOk && serviceInstanceIDOk && serviceNameOk && !ok {
attributeCache.Hash[hashKey] = struct{}{}
@ -2912,7 +2926,7 @@ func (aH *APIHandler) getNetworkData(
}
}
queryRangeParams, err = mq.BuildQRParamsNetwork(messagingQueue, "fetch-latency", attributeCache)
queryRangeParams, err = mq.BuildQRParamsWithCache(messagingQueue, "fetch-latency", attributeCache)
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
@ -2938,7 +2952,8 @@ func (aH *APIHandler) getNetworkData(
clientID, clientIDOk := series.Labels["client_id"]
serviceInstanceID, serviceInstanceIDOk := series.Labels["service_instance_id"]
serviceName, serviceNameOk := series.Labels["service_name"]
hashKey := uniqueIdentifier(clientID, serviceInstanceID, serviceName, "#")
params := []string{clientID, serviceInstanceID, serviceName}
hashKey := uniqueIdentifier(params, "#")
_, ok := attributeCache.Hash[hashKey]
if clientIDOk && serviceInstanceIDOk && serviceNameOk && ok {
latencySeries = append(latencySeries, series)
@ -3040,6 +3055,362 @@ func (aH *APIHandler) getConsumerData(
aH.Respond(w, resp)
}
// s1
func (aH *APIHandler) getPartitionOverviewLatencyData(
w http.ResponseWriter, r *http.Request,
) {
messagingQueue, apiErr := ParseMessagingQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
RespondError(w, apiErr, nil)
return
}
queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "partition_latency")
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
var result []*v3.Result
var errQuriesByName map[string]error
result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
return
}
result = postprocess.TransformToTableForClickHouseQueries(result)
resp := v3.QueryRangeResponse{
Result: result,
}
aH.Respond(w, resp)
}
// s1
func (aH *APIHandler) getConsumerPartitionLatencyData(
w http.ResponseWriter, r *http.Request,
) {
messagingQueue, apiErr := ParseMessagingQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
RespondError(w, apiErr, nil)
return
}
queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "consumer_partition_latency")
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
var result []*v3.Result
var errQuriesByName map[string]error
result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
return
}
result = postprocess.TransformToTableForClickHouseQueries(result)
resp := v3.QueryRangeResponse{
Result: result,
}
aH.Respond(w, resp)
}
// s3 p overview
// fetch traces
// cache attributes
// fetch byte rate metrics
func (aH *APIHandler) getProducerThroughputOverview(
w http.ResponseWriter, r *http.Request,
) {
messagingQueue, apiErr := ParseMessagingQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
RespondError(w, apiErr, nil)
return
}
attributeCache := &mq.Clients{
Hash: make(map[string]struct{}),
}
queryRangeParams, err := mq.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview", attributeCache)
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
var result []*v3.Result
var errQuriesByName map[string]error
result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
return
}
for _, res := range result {
for _, series := range res.Series {
serviceName, serviceNameOk := series.Labels["service_name"]
topicName, topicNameOk := series.Labels["topic"]
params := []string{serviceName, topicName}
hashKey := uniqueIdentifier(params, "#")
_, ok := attributeCache.Hash[hashKey]
if topicNameOk && serviceNameOk && !ok {
attributeCache.Hash[hashKey] = struct{}{}
attributeCache.TopicName = append(attributeCache.TopicName, topicName)
attributeCache.ServiceName = append(attributeCache.ServiceName, serviceName)
}
}
}
queryRangeParams, err = mq.BuildQRParamsWithCache(messagingQueue, "producer-throughput-overview-latency", attributeCache)
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
resultFetchLatency, errQueriesByNameFetchLatency, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQueriesByNameFetchLatency)
return
}
latencyColumn := &v3.Result{QueryName: "latency"}
var latencySeries []*v3.Series
for _, res := range resultFetchLatency {
for _, series := range res.Series {
topic, topicOk := series.Labels["topic"]
serviceName, serviceNameOk := series.Labels["service_name"]
params := []string{topic, serviceName}
hashKey := uniqueIdentifier(params, "#")
_, ok := attributeCache.Hash[hashKey]
if topicOk && serviceNameOk && ok {
latencySeries = append(latencySeries, series)
}
}
}
latencyColumn.Series = latencySeries
result = append(result, latencyColumn)
resultFetchLatency = postprocess.TransformToTableForBuilderQueries(result, queryRangeParams)
resp := v3.QueryRangeResponse{
Result: resultFetchLatency,
}
aH.Respond(w, resp)
}
// s3 p details
func (aH *APIHandler) getProducerThroughputDetails(
w http.ResponseWriter, r *http.Request,
) {
messagingQueue, apiErr := ParseMessagingQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
RespondError(w, apiErr, nil)
return
}
queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "producer-throughput-details")
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
var result []*v3.Result
var errQuriesByName map[string]error
result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
return
}
result = postprocess.TransformToTableForClickHouseQueries(result)
resp := v3.QueryRangeResponse{
Result: result,
}
aH.Respond(w, resp)
}
// s3 c overview
func (aH *APIHandler) getConsumerThroughputOverview(
w http.ResponseWriter, r *http.Request,
) {
messagingQueue, apiErr := ParseMessagingQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
RespondError(w, apiErr, nil)
return
}
queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "consumer-throughput-overview")
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
var result []*v3.Result
var errQuriesByName map[string]error
result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
return
}
result = postprocess.TransformToTableForClickHouseQueries(result)
resp := v3.QueryRangeResponse{
Result: result,
}
aH.Respond(w, resp)
}
// s3 c details
func (aH *APIHandler) getConsumerThroughputDetails(
w http.ResponseWriter, r *http.Request,
) {
messagingQueue, apiErr := ParseMessagingQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
RespondError(w, apiErr, nil)
return
}
queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "consumer-throughput-details")
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
var result []*v3.Result
var errQuriesByName map[string]error
result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
return
}
result = postprocess.TransformToTableForClickHouseQueries(result)
resp := v3.QueryRangeResponse{
Result: result,
}
aH.Respond(w, resp)
}
// s4
// needs logic to parse duration
// needs logic to get the percentage
// show 10 traces
func (aH *APIHandler) getProducerConsumerEval(
w http.ResponseWriter, r *http.Request,
) {
messagingQueue, apiErr := ParseMessagingQueueBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
RespondError(w, apiErr, nil)
return
}
queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "producer-consumer-eval")
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
var result []*v3.Result
var errQuriesByName map[string]error
result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
return
}
resp := v3.QueryRangeResponse{
Result: result,
}
aH.Respond(w, resp)
}
// ParseMessagingQueueBody parse for messaging queue params
func ParseMessagingQueueBody(r *http.Request) (*mq.MessagingQueue, *model.ApiError) {
messagingQueue := new(mq.MessagingQueue)

View File

@ -28,6 +28,7 @@ Response in query range `table` format
"resultType": "",
"result": [
{
"table": {
"columns": [
{
@ -519,3 +520,461 @@ Response in query range `table` format
]
}
```
### Partition Latency
```json
/api/v1/messaging-queues/kafka/partition-latency/overview
```
```json
{
"start": 1728287046000000000,
"end": 1728587046000000000
}
```
```json
{
"status": "success",
"data": {
"resultType": "",
"result": [
{
"table": {
"columns": [
{
"name": "topic",
"queryName": "",
"isValueColumn": false
},
{
"name": "p99",
"queryName": "",
"isValueColumn": false
},
{
"name": "partition_latency",
"queryName": "partition_latency",
"isValueColumn": true
}
],
"rows": [
{
"data": {
"p99": "2",
"partition_latency": 1.18,
"topic": "topic1"
}
},
{
"data": {
"p99": "2",
"partition_latency": 0.15,
"topic": "topic2"
}
},
{
"data": {
"p99": "2",
"partition_latency": 0.26,
"topic": "topic3"
}
}
]
}
}
]
}
}
```
---------
```json
/api/v1/messaging-queues/kafka/partition-latency/consumer
```
```json
{
"start": 1728287046000000000,
"end": 1728587046000000000,
"variables": {
"partition": "2",
"topic": "topic1"
}
}
```
```json
{
"status": "success",
"data": {
"resultType": "",
"result": [
{
"table": {
"columns": [
{
"name": "consumer_group",
"queryName": "",
"isValueColumn": false
},
{
"name": "service_name",
"queryName": "",
"isValueColumn": false
},
{
"name": "p99",
"queryName": "",
"isValueColumn": false
},
{
"name": "error_rate",
"queryName": "",
"isValueColumn": false
},
{
"name": "throughput",
"queryName": "",
"isValueColumn": false
}
],
"rows": [
{
"data": {
"consumer_group": "cg1",
"error_rate": "0",
"p99": "0.11994228000000004",
"service_name": "consumer-svc",
"throughput": "1.18116"
}
}
]
}
}
]
}
}
```
---------
### Topic throughput
```json
/api/v1/messaging-queues/kafka/topic-throughput/producer
```
```json
{
"start": 1728287046000000000,
"end": 1728587046000000000
}
```
```json
{
"status": "success",
"data": {
"resultType": "",
"result": [
{
"table": {
"columns": [
{
"name": "topic",
"queryName": "",
"isValueColumn": false
},
{
"name": "serviceName",
"queryName": "",
"isValueColumn": false
},
{
"name": "p99",
"queryName": "",
"isValueColumn": false
},
{
"name": "error_rate",
"queryName": "",
"isValueColumn": false
},
{
"name": "throughput",
"queryName": "",
"isValueColumn": false
}
],
"rows": [
{
"data": {
"error_rate": "0",
"p99": "8.662880220000002",
"serviceName": "producer-svc1",
"throughput": "0.41642666666666667",
"topic": "topic1"
}
},
{
"data": {
"error_rate": "0",
"p99": "9.786847500000016",
"serviceName": "producer-svc2",
"throughput": "0.76473",
"topic": "topic1"
}
},
{
"data": {
"error_rate": "0",
"p99": "14.432925500000021",
"serviceName": "producer-svc3",
"throughput": "0.08976",
"topic": "topic2"
}
},
{
"data": {
"error_rate": "0",
"p99": "14.32833297000002",
"serviceName": "producer-svc2",
"throughput": "0.06449333333333333",
"topic": "topic2"
}
},
{
"data": {
"error_rate": "0",
"p99": "13.416533810000036",
"serviceName": "producer-svc4",
"throughput": "0.14766",
"topic": "topic3"
}
},
{
"data": {
"error_rate": "0",
"p99": "13.366232000000034",
"serviceName": "producer-svc3",
"throughput": "0.11166666666666666",
"topic": "topic3"
}
}
]
}
}
]
}
}
```
---------
### Topic throughput
```json
/api/v1/messaging-queues/kafka/topic-throughput/producer-details
```
```json
{
"start": 1728287046000000000,
"end": 1728587046000000000,
"variables": {
"partition": "2",
"topic": "topic1",
"service_name": "producer-svc2"
}
}
```
```json
{
"status": "success",
"data": {
"resultType": "",
"result": [
{
"table": {
"columns": [
{
"name": "partition",
"queryName": "",
"isValueColumn": false
},
{
"name": "p99",
"queryName": "",
"isValueColumn": false
},
{
"name": "error_rate",
"queryName": "",
"isValueColumn": false
},
{
"name": "throughput",
"queryName": "",
"isValueColumn": false
}
],
"rows": [
{
"data": {
"error_rate": "0",
"p99": "9.165558780000026",
"partition": "2",
"throughput": "0.76473"
}
}
]
}
}
]
}
}
```
---------
### Topic throughput
```json
/api/v1/messaging-queues/kafka/topic-throughput/consumer
```
```json
{
"start": 1728287046000000000,
"end": 1728587046000000000
}
```
```json
{
"status": "success",
"data": {
"resultType": "",
"result": [
{
"table": {
"columns": [
{
"name": "topic",
"queryName": "",
"isValueColumn": false
},
{
"name": "service_name",
"queryName": "",
"isValueColumn": false
},
{
"name": "p99",
"queryName": "",
"isValueColumn": false
},
{
"name": "error_rate",
"queryName": "",
"isValueColumn": false
},
{
"name": "ingestion_rate",
"queryName": "",
"isValueColumn": false
},
{
"name": "byte_rate",
"queryName": "",
"isValueColumn": false
}
],
"rows": [
{
"data": {
"byte_rate": "17.7174",
"error_rate": "0",
"ingestion_rate": "1.18116",
"p99": "0.12260112000000009",
"service_name": "consumer-svc",
"topic": "topic1"
}
},
{
"data": {
"byte_rate": "2.1594533333333334",
"error_rate": "0",
"ingestion_rate": "0.15424666666666667",
"p99": "7.4079657800000005",
"service_name": "consumer-svc2",
"topic": "topic2"
}
},
{
"data": {
"byte_rate": "3.66446",
"error_rate": "0",
"ingestion_rate": "0.25933",
"p99": "6.135769970000011",
"service_name": "consumer-svc3",
"topic": "topic3"
}
}
]
}
}
]
}
}
```
---------
### Topic throughput
```json
/api/v1/messaging-queues/kafka/topic-throughput/consumer-details
```
```json
{
"start": 1728287046000000000,
"end": 1728587046000000000,
"variables": {
"topic": "topic1",
"service_name": "consumer-svc"
}
}
```
```json
{
"status": "success",
"data": {
"resultType": "",
"result": [
{
"table": {
"columns": [
{
"name": "partition",
"queryName": "",
"isValueColumn": false
},
{
"name": "p99",
"queryName": "",
"isValueColumn": false
},
{
"name": "error_rate",
"queryName": "",
"isValueColumn": false
},
{
"name": "throughput",
"queryName": "",
"isValueColumn": false
}
],
"rows": [
{
"data": {
"error_rate": "0",
"p99": "0.11789381000000003",
"partition": "2",
"throughput": "1.18116"
}
}
]
}
}
]
}
}
```

View File

@ -5,6 +5,7 @@ const KafkaQueue = "kafka"
type MessagingQueue struct {
Start int64 `json:"start"`
End int64 `json:"end"`
EvalTime int64 `json:"eval_time"`
Variables map[string]string `json:"variables,omitempty"`
}
@ -13,6 +14,7 @@ type Clients struct {
ClientID []string
ServiceInstanceID []string
ServiceName []string
TopicName []string
}
type OnboardingResponse struct {

View File

@ -12,7 +12,7 @@ WITH consumer_query AS (
serviceName,
quantile(0.99)(durationNano) / 1000000 AS p99,
COUNT(*) AS total_requests,
SUM(CASE WHEN statusCode = 2 THEN 1 ELSE 0 END) AS error_count,
sumIf(1, statusCode = 2) AS error_count,
avg(CASE WHEN has(numberTagMap, 'messaging.message.body.size') THEN numberTagMap['messaging.message.body.size'] ELSE NULL END) AS avg_msg_size
FROM signoz_traces.distributed_signoz_index_v2
WHERE
@ -30,7 +30,7 @@ SELECT
serviceName AS service_name,
p99,
COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate,
COALESCE(total_requests / %d, 0) AS throughput, -- Convert nanoseconds to seconds
COALESCE(total_requests / %d, 0) AS throughput,
COALESCE(avg_msg_size, 0) AS avg_msg_size
FROM
consumer_query
@ -40,6 +40,257 @@ ORDER BY
return query
}
// S1 landing
func generatePartitionLatencySQL(start, end int64, queueType string) string {
timeRange := (end - start) / 1000000000
query := fmt.Sprintf(`
WITH partition_query AS (
SELECT
quantile(0.99)(durationNano) / 1000000 AS p99,
count(*) AS total_requests,
stringTagMap['messaging.destination.name'] AS topic,
stringTagMap['messaging.destination.partition.id'] AS partition
FROM signoz_traces.distributed_signoz_index_v2
WHERE
timestamp >= '%d'
AND timestamp <= '%d'
AND kind = 4
AND msgSystem = '%s'
GROUP BY topic, partition
)
SELECT
topic,
partition,
p99,
COALESCE(total_requests / %d, 0) AS throughput
FROM
partition_query
ORDER BY
topic;
`, start, end, queueType, timeRange)
return query
}
// S1 consumer
func generateConsumerPartitionLatencySQL(start, end int64, topic, partition, queueType string) string {
timeRange := (end - start) / 1000000000
query := fmt.Sprintf(`
WITH consumer_pl AS (
SELECT
stringTagMap['messaging.kafka.consumer.group'] AS consumer_group,
serviceName,
quantile(0.99)(durationNano) / 1000000 AS p99,
COUNT(*) AS total_requests,
sumIf(1, statusCode = 2) AS error_count
FROM signoz_traces.distributed_signoz_index_v2
WHERE
timestamp >= '%d'
AND timestamp <= '%d'
AND kind = 5
AND msgSystem = '%s'
AND stringTagMap['messaging.destination.name'] = '%s'
AND stringTagMap['messaging.destination.partition.id'] = '%s'
GROUP BY consumer_group, serviceName
)
SELECT
consumer_group,
serviceName AS service_name,
p99,
COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate,
COALESCE(total_requests / %d, 0) AS throughput
FROM
consumer_pl
ORDER BY
consumer_group;
`, start, end, queueType, topic, partition, timeRange)
return query
}
// S3, producer overview
func generateProducerPartitionThroughputSQL(start, end int64, queueType string) string {
timeRange := (end - start) / 1000000000
// t, svc, rps, byte*, p99, err
query := fmt.Sprintf(`
WITH producer_latency AS (
SELECT
serviceName,
quantile(0.99)(durationNano) / 1000000 AS p99,
stringTagMap['messaging.destination.name'] AS topic,
COUNT(*) AS total_requests,
sumIf(1, statusCode = 2) AS error_count
FROM signoz_traces.distributed_signoz_index_v2
WHERE
timestamp >= '%d'
AND timestamp <= '%d'
AND kind = 4
AND msgSystem = '%s'
GROUP BY topic, serviceName
)
SELECT
topic,
serviceName AS service_name,
p99,
COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate,
COALESCE(total_requests / %d, 0) AS throughput
FROM
producer_latency
`, start, end, queueType, timeRange)
return query
}
// S3, producer topic/service overview
func generateProducerTopicLatencySQL(start, end int64, topic, service, queueType string) string {
timeRange := (end - start) / 1000000000
query := fmt.Sprintf(`
WITH consumer_latency AS (
SELECT
quantile(0.99)(durationNano) / 1000000 AS p99,
stringTagMap['messaging.destination.partition.id'] AS partition,
COUNT(*) AS total_requests,
sumIf(1, statusCode = 2) AS error_count
FROM signoz_traces.distributed_signoz_index_v2
WHERE
timestamp >= '%d'
AND timestamp <= '%d'
AND kind = 4
AND serviceName = '%s'
AND msgSystem = '%s'
AND stringTagMap['messaging.destination.name'] = '%s'
GROUP BY partition
)
SELECT
partition,
p99,
COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate,
COALESCE(total_requests / %d, 0) AS throughput
FROM
consumer_latency
`, start, end, service, queueType, topic, timeRange)
return query
}
// S3 consumer overview
func generateConsumerLatencySQL(start, end int64, queueType string) string {
timeRange := (end - start) / 1000000000
query := fmt.Sprintf(`
WITH consumer_latency AS (
SELECT
serviceName,
stringTagMap['messaging.destination.name'] AS topic,
quantile(0.99)(durationNano) / 1000000 AS p99,
COUNT(*) AS total_requests,
sumIf(1, statusCode = 2) AS error_count,
SUM(numberTagMap['messaging.message.body.size']) AS total_bytes
FROM signoz_traces.distributed_signoz_index_v2
WHERE
timestamp >= '%d'
AND timestamp <= '%d'
AND kind = 5
AND msgSystem = '%s'
GROUP BY topic, serviceName
)
SELECT
topic,
serviceName AS service_name,
p99,
COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate,
COALESCE(total_requests / %d, 0) AS ingestion_rate,
COALESCE(total_bytes / %d, 0) AS byte_rate
FROM
consumer_latency
ORDER BY
topic;
`, start, end, queueType, timeRange, timeRange)
return query
}
// S3 consumer topic/service
func generateConsumerServiceLatencySQL(start, end int64, topic, service, queueType string) string {
timeRange := (end - start) / 1000000000
query := fmt.Sprintf(`
WITH consumer_latency AS (
SELECT
quantile(0.99)(durationNano) / 1000000 AS p99,
stringTagMap['messaging.destination.partition.id'] AS partition,
COUNT(*) AS total_requests,
sumIf(1, statusCode = 2) AS error_count
FROM signoz_traces.distributed_signoz_index_v2
WHERE
timestamp >= '%d'
AND timestamp <= '%d'
AND kind = 5
AND serviceName = '%s'
AND msgSystem = '%s'
AND stringTagMap['messaging.destination.name'] = '%s'
GROUP BY partition
)
SELECT
partition,
p99,
COALESCE((error_count * 100.0) / total_requests, 0) AS error_rate,
COALESCE(total_requests / %d, 0) AS throughput
FROM
consumer_latency
`, start, end, service, queueType, topic, timeRange)
return query
}
// s4
func generateProducerConsumerEvalSQL(start, end int64, queueType string, evalTime int64) string {
query := fmt.Sprintf(`
WITH trace_data AS (
SELECT
p.serviceName AS producer_service,
c.serviceName AS consumer_service,
p.traceID,
p.timestamp AS producer_timestamp,
c.timestamp AS consumer_timestamp,
p.durationNano AS durationNano,
(toUnixTimestamp64Nano(c.timestamp) - toUnixTimestamp64Nano(p.timestamp)) + p.durationNano AS time_difference
FROM
signoz_traces.distributed_signoz_index_v2 p
INNER JOIN
signoz_traces.distributed_signoz_index_v2 c
ON p.traceID = c.traceID
AND c.parentSpanID = p.spanID
WHERE
p.kind = 4
AND c.kind = 5
AND toUnixTimestamp64Nano(p.timestamp) BETWEEN '%d' AND '%d'
AND toUnixTimestamp64Nano(c.timestamp) BETWEEN '%d' AND '%d'
AND c.msgSystem = '%s'
AND p.msgSystem = '%s'
)
SELECT
producer_service,
consumer_service,
COUNT(*) AS total_spans,
SUM(time_difference > '%d') AS breached_spans,
((breached_spans) * 100.0) / total_spans AS breach_percentage,
arraySlice(
arrayMap(x -> x.1,
arraySort(
x -> -x.2,
groupArrayIf((traceID, time_difference), time_difference > '%d')
)
),
1, 10
) AS top_traceIDs
FROM trace_data
GROUP BY
producer_service,
consumer_service
`, start, end, start, end, queueType, queueType, evalTime, evalTime)
return query
}
func generateProducerSQL(start, end int64, topic, partition, queueType string) string {
timeRange := (end - start) / 1000000000
query := fmt.Sprintf(`
@ -48,7 +299,7 @@ WITH producer_query AS (
serviceName,
quantile(0.99)(durationNano) / 1000000 AS p99,
count(*) AS total_count,
SUM(CASE WHEN statusCode = 2 THEN 1 ELSE 0 END) AS error_count
sumIf(1, statusCode = 2) AS error_count
FROM signoz_traces.distributed_signoz_index_v2
WHERE
timestamp >= '%d'
@ -64,12 +315,11 @@ SELECT
serviceName AS service_name,
p99,
COALESCE((error_count * 100.0) / total_count, 0) AS error_percentage,
COALESCE(total_count / %d, 0) AS throughput -- Convert nanoseconds to seconds
COALESCE(total_count / %d, 0) AS throughput
FROM
producer_query
ORDER BY
serviceName;
`, start, end, queueType, topic, partition, timeRange)
return query
}

View File

@ -2,8 +2,7 @@ package kafka
import (
"fmt"
"strings"
"go.signoz.io/signoz/ee/query-service/constants"
"go.signoz.io/signoz/pkg/query-service/common"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
@ -12,6 +11,9 @@ var defaultStepInterval int64 = 60
func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) (*v3.QueryRangeParamsV3, error) {
if constants.KafkaSpanEval == "false" && queryContext == "producer-consumer-eval" {
return nil, fmt.Errorf("span evaluation feature is disabled and is experimental")
}
// ToDo: propagate this through APIs when there are different handlers
queueType := KafkaQueue
@ -37,14 +39,6 @@ func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string)
return queryRangeParams, nil
}
func PrepareClikhouseQueries(messagingQueue *MessagingQueue, queryContext string) (*v3.ClickHouseQuery, error) {
queueType := KafkaQueue
chq, err := BuildClickHouseQuery(messagingQueue, queueType, queryContext)
return chq, err
}
func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType string) (*v3.ClickHouseQuery, error) {
start := messagingQueue.Start
end := messagingQueue.End
@ -65,12 +59,60 @@ func buildClickHouseQueryNetwork(messagingQueue *MessagingQueue, queueType strin
}, nil
}
func formatstring(str []string) string {
joined := strings.Join(str, ", ")
if len(joined) <= 2 {
return ""
func buildBuilderQueriesProducerBytes(unixMilliStart, unixMilliEnd int64, attributeCache *Clients) (map[string]*v3.BuilderQuery, error) {
bq := make(map[string]*v3.BuilderQuery)
queryName := fmt.Sprintf("latency")
chq := &v3.BuilderQuery{
QueryName: queryName,
StepInterval: common.MinAllowedStepInterval(unixMilliStart, unixMilliEnd),
DataSource: v3.DataSourceMetrics,
AggregateAttribute: v3.AttributeKey{
Key: "kafka_producer_byte_rate",
},
AggregateOperator: v3.AggregateOperatorAvg,
Temporality: v3.Unspecified,
TimeAggregation: v3.TimeAggregationAvg,
SpaceAggregation: v3.SpaceAggregationAvg,
Filters: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "service_name",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
},
Operator: v3.FilterOperatorIn,
Value: attributeCache.ServiceName,
},
{
Key: v3.AttributeKey{
Key: "topic",
Type: v3.AttributeKeyTypeTag,
DataType: v3.AttributeKeyDataTypeString,
},
Operator: v3.FilterOperatorIn,
Value: attributeCache.TopicName,
},
},
},
Expression: queryName,
ReduceTo: v3.ReduceToOperatorAvg,
GroupBy: []v3.AttributeKey{{
Key: "service_name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
{
Key: "topic",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
},
}
return joined[1 : len(joined)-1]
bq[queryName] = chq
return bq, nil
}
func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCache *Clients) (map[string]*v3.BuilderQuery, error) {
@ -143,7 +185,7 @@ func buildBuilderQueriesNetwork(unixMilliStart, unixMilliEnd int64, attributeCac
return bq, nil
}
func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, attributeCache *Clients) (*v3.QueryRangeParamsV3, error) {
func BuildQRParamsWithCache(messagingQueue *MessagingQueue, queryContext string, attributeCache *Clients) (*v3.QueryRangeParamsV3, error) {
queueType := KafkaQueue
@ -151,6 +193,7 @@ func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, a
unixMilliEnd := messagingQueue.End / 1000000
var cq *v3.CompositeQuery
var err error
if queryContext == "throughput" {
chq, err := buildClickHouseQueryNetwork(messagingQueue, queueType)
@ -171,6 +214,24 @@ func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, a
BuilderQueries: bhq,
PanelType: v3.PanelTypeTable,
}
} else if queryContext == "producer-throughput-overview" {
start := messagingQueue.Start
end := messagingQueue.End
query := generateProducerPartitionThroughputSQL(start, end, queueType)
cq, err = buildCompositeQuery(&v3.ClickHouseQuery{
Query: query,
}, queryContext)
} else if queryContext == "producer-throughput-overview-latency" {
bhq, err := buildBuilderQueriesProducerBytes(unixMilliStart, unixMilliEnd, attributeCache)
if err != nil {
return nil, err
}
cq = &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
BuilderQueries: bhq,
PanelType: v3.PanelTypeTable,
}
}
queryRangeParams := &v3.QueryRangeParamsV3{
@ -182,7 +243,7 @@ func BuildQRParamsNetwork(messagingQueue *MessagingQueue, queryContext string, a
FormatForWeb: true,
}
return queryRangeParams, nil
return queryRangeParams, err
}
func BuildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, queryContext string) (*v3.ClickHouseQuery, error) {
@ -190,16 +251,22 @@ func BuildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, quer
end := messagingQueue.End
var topic, partition string
if queryContext == "producer" || queryContext == "consumer" {
if queryContext == "producer" ||
queryContext == "consumer" ||
queryContext == "consumer_partition_latency" ||
queryContext == "producer-topic-throughput" ||
queryContext == "producer-throughput-details" ||
queryContext == "consumer-throughput-details" {
var ok bool
topic, ok = messagingQueue.Variables["topic"]
if !ok {
return nil, fmt.Errorf("invalid type for Topic")
}
partition, ok = messagingQueue.Variables["partition"]
if !ok {
return nil, fmt.Errorf("invalid type for Partition")
if queryContext != "consumer-throughput-details" {
partition, ok = messagingQueue.Variables["partition"]
if !ok {
return nil, fmt.Errorf("invalid type for Partition")
}
}
}
@ -212,6 +279,26 @@ func BuildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, quer
return nil, fmt.Errorf("invalid type for consumer group")
}
query = generateConsumerSQL(start, end, topic, partition, consumerGroup, queueType)
} else if queryContext == "producer-topic-throughput" {
query = generatePartitionLatencySQL(start, end, queueType)
} else if queryContext == "consumer_partition_latency" {
query = generateConsumerPartitionLatencySQL(start, end, topic, partition, queueType)
} else if queryContext == "producer-throughput-details" {
svcName, ok := messagingQueue.Variables["service_name"]
if !ok {
return nil, fmt.Errorf("invalid type for service")
}
query = generateProducerTopicLatencySQL(start, end, topic, svcName, queueType)
} else if queryContext == "consumer-throughput-overview" {
query = generateConsumerLatencySQL(start, end, queueType)
} else if queryContext == "consumer-throughput-details" {
svcName, ok := messagingQueue.Variables["service_name"]
if !ok {
return nil, fmt.Errorf("invalid type for service")
}
query = generateConsumerServiceLatencySQL(start, end, topic, svcName, queueType)
} else if queryContext == "producer-consumer-eval" {
query = generateProducerConsumerEvalSQL(start, end, queueType, messagingQueue.EvalTime)
} else if queryContext == "onboard_producers" {
query = onboardProducersSQL(start, end, queueType)
} else if queryContext == "onboard_consumers" {
@ -219,13 +306,21 @@ func BuildClickHouseQuery(messagingQueue *MessagingQueue, queueType string, quer
} else if queryContext == "onboard_kafka" {
query = onboardKafkaSQL(start, end)
}
return &v3.ClickHouseQuery{
Query: query,
}, nil
}
func buildCompositeQuery(chq *v3.ClickHouseQuery, queryContext string) (*v3.CompositeQuery, error) {
if queryContext == "producer-consumer-eva" {
return &v3.CompositeQuery{
QueryType: v3.QueryTypeClickHouseSQL,
ClickHouseQueries: map[string]*v3.ClickHouseQuery{queryContext: chq},
PanelType: v3.PanelTypeList,
}, nil
}
return &v3.CompositeQuery{
QueryType: v3.QueryTypeClickHouseSQL,
ClickHouseQueries: map[string]*v3.ClickHouseQuery{queryContext: chq},

View File

@ -381,7 +381,15 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan
}
}
queries, err := q.builder.PrepareQueries(params)
queries := make(map[string]string)
var err error
if params.CompositeQuery.QueryType == v3.QueryTypeBuilder {
queries, err = q.builder.PrepareQueries(params)
} else if params.CompositeQuery.QueryType == v3.QueryTypeClickHouseSQL {
for name, chQuery := range params.CompositeQuery.ClickHouseQueries {
queries[name] = chQuery.Query
}
}
if err != nil {
return nil, nil, err
@ -452,7 +460,11 @@ func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3)
case v3.QueryTypePromQL:
results, errQueriesByName, err = q.runPromQueries(ctx, params)
case v3.QueryTypeClickHouseSQL:
results, errQueriesByName, err = q.runClickHouseQueries(ctx, params)
if params.CompositeQuery.PanelType == v3.PanelTypeList || params.CompositeQuery.PanelType == v3.PanelTypeTrace {
results, errQueriesByName, err = q.runBuilderListQueries(ctx, params)
} else {
results, errQueriesByName, err = q.runClickHouseQueries(ctx, params)
}
default:
err = fmt.Errorf("invalid query type")
}