mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-14 17:15:55 +08:00
feat: add consumer and producer APIs
This commit is contained in:
parent
61e6316736
commit
481bb6e8b8
@ -43,6 +43,7 @@ import (
|
|||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
mq "go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/kafka"
|
||||||
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
|
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
|
||||||
"go.signoz.io/signoz/pkg/query-service/dao"
|
"go.signoz.io/signoz/pkg/query-service/dao"
|
||||||
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
|
am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager"
|
||||||
@ -2246,9 +2247,113 @@ func (aH *APIHandler) WriteJSON(w http.ResponseWriter, r *http.Request, response
|
|||||||
w.Write(resp)
|
w.Write(resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RegisterMessagingQueuesRoutes adds messaging-queues routes
|
||||||
|
func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *AuthMiddleware) {
|
||||||
|
// SubRouter for kafka
|
||||||
|
kafkaSubRouter := router.PathPrefix("/api/v1/messaging-queues/kafka/consumer-lag").Subrouter()
|
||||||
|
|
||||||
|
//kafkaSubRouter.HandleFunc("/consumer-lag", am.ViewAccess(aH.QueryRangeV4)).Methods(http.MethodPost)
|
||||||
|
kafkaSubRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost)
|
||||||
|
kafkaSubRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost)
|
||||||
|
|
||||||
|
// for other messaging queues, add SubRouters here
|
||||||
|
}
|
||||||
|
|
||||||
|
func (aH *APIHandler) getProducerData(
|
||||||
|
w http.ResponseWriter, r *http.Request,
|
||||||
|
) {
|
||||||
|
// parse the query params to retrieve the messaging queue struct
|
||||||
|
messagingQueue, apiErr := ParseMessagingQueueParams(r)
|
||||||
|
|
||||||
|
if apiErr != nil {
|
||||||
|
zap.L().Error(apiErr.Err.Error())
|
||||||
|
RespondError(w, apiErr, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "producer")
|
||||||
|
if err != nil {
|
||||||
|
zap.L().Error(err.Error())
|
||||||
|
RespondError(w, apiErr, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
|
||||||
|
zap.L().Error(err.Error())
|
||||||
|
RespondError(w, apiErr, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var result []*v3.Result
|
||||||
|
var errQuriesByName map[string]error
|
||||||
|
|
||||||
|
result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil)
|
||||||
|
if err != nil {
|
||||||
|
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
|
||||||
|
RespondError(w, apiErrObj, errQuriesByName)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
result = postprocess.TransformToTableForClickHouseQueries(result)
|
||||||
|
|
||||||
|
resp := v3.QueryRangeResponse{
|
||||||
|
Result: result,
|
||||||
|
}
|
||||||
|
aH.Respond(w, resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (aH *APIHandler) getConsumerData(
|
||||||
|
w http.ResponseWriter, r *http.Request,
|
||||||
|
) {
|
||||||
|
messagingQueue, apiErr := ParseMessagingQueueParams(r)
|
||||||
|
|
||||||
|
if apiErr != nil {
|
||||||
|
zap.L().Error(apiErr.Err.Error())
|
||||||
|
RespondError(w, apiErr, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
queryRangeParams, err := mq.BuildQueryRangeParams(messagingQueue, "consumer")
|
||||||
|
if err != nil {
|
||||||
|
zap.L().Error(err.Error())
|
||||||
|
RespondError(w, apiErr, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
|
||||||
|
zap.L().Error(err.Error())
|
||||||
|
RespondError(w, apiErr, nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var result []*v3.Result
|
||||||
|
var errQuriesByName map[string]error
|
||||||
|
|
||||||
|
result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil)
|
||||||
|
if err != nil {
|
||||||
|
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
|
||||||
|
RespondError(w, apiErrObj, errQuriesByName)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
result = postprocess.TransformToTableForClickHouseQueries(result)
|
||||||
|
|
||||||
|
resp := v3.QueryRangeResponse{
|
||||||
|
Result: result,
|
||||||
|
}
|
||||||
|
aH.Respond(w, resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ParseMessagingQueueParams parse for messaging queue params
|
||||||
|
func ParseMessagingQueueParams(r *http.Request) (*mq.MessagingQueue, *model.ApiError) {
|
||||||
|
var messagingQueue *mq.MessagingQueue
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&messagingQueue); err != nil {
|
||||||
|
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)}
|
||||||
|
}
|
||||||
|
return messagingQueue, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Preferences
|
// Preferences
|
||||||
|
|
||||||
func (ah *APIHandler) getUserPreference(
|
func (aH *APIHandler) getUserPreference(
|
||||||
w http.ResponseWriter, r *http.Request,
|
w http.ResponseWriter, r *http.Request,
|
||||||
) {
|
) {
|
||||||
preferenceId := mux.Vars(r)["preferenceId"]
|
preferenceId := mux.Vars(r)["preferenceId"]
|
||||||
@ -2262,10 +2367,10 @@ func (ah *APIHandler) getUserPreference(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ah.Respond(w, preference)
|
aH.Respond(w, preference)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ah *APIHandler) updateUserPreference(
|
func (aH *APIHandler) updateUserPreference(
|
||||||
w http.ResponseWriter, r *http.Request,
|
w http.ResponseWriter, r *http.Request,
|
||||||
) {
|
) {
|
||||||
preferenceId := mux.Vars(r)["preferenceId"]
|
preferenceId := mux.Vars(r)["preferenceId"]
|
||||||
@ -2284,10 +2389,10 @@ func (ah *APIHandler) updateUserPreference(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ah.Respond(w, preference)
|
aH.Respond(w, preference)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ah *APIHandler) getAllUserPreferences(
|
func (aH *APIHandler) getAllUserPreferences(
|
||||||
w http.ResponseWriter, r *http.Request,
|
w http.ResponseWriter, r *http.Request,
|
||||||
) {
|
) {
|
||||||
user := common.GetUserFromContext(r.Context())
|
user := common.GetUserFromContext(r.Context())
|
||||||
@ -2299,10 +2404,10 @@ func (ah *APIHandler) getAllUserPreferences(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ah.Respond(w, preference)
|
aH.Respond(w, preference)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ah *APIHandler) getOrgPreference(
|
func (aH *APIHandler) getOrgPreference(
|
||||||
w http.ResponseWriter, r *http.Request,
|
w http.ResponseWriter, r *http.Request,
|
||||||
) {
|
) {
|
||||||
preferenceId := mux.Vars(r)["preferenceId"]
|
preferenceId := mux.Vars(r)["preferenceId"]
|
||||||
@ -2315,10 +2420,10 @@ func (ah *APIHandler) getOrgPreference(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ah.Respond(w, preference)
|
aH.Respond(w, preference)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ah *APIHandler) updateOrgPreference(
|
func (aH *APIHandler) updateOrgPreference(
|
||||||
w http.ResponseWriter, r *http.Request,
|
w http.ResponseWriter, r *http.Request,
|
||||||
) {
|
) {
|
||||||
preferenceId := mux.Vars(r)["preferenceId"]
|
preferenceId := mux.Vars(r)["preferenceId"]
|
||||||
@ -2337,10 +2442,10 @@ func (ah *APIHandler) updateOrgPreference(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ah.Respond(w, preference)
|
aH.Respond(w, preference)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ah *APIHandler) getAllOrgPreferences(
|
func (aH *APIHandler) getAllOrgPreferences(
|
||||||
w http.ResponseWriter, r *http.Request,
|
w http.ResponseWriter, r *http.Request,
|
||||||
) {
|
) {
|
||||||
user := common.GetUserFromContext(r.Context())
|
user := common.GetUserFromContext(r.Context())
|
||||||
@ -2352,36 +2457,36 @@ func (ah *APIHandler) getAllOrgPreferences(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ah.Respond(w, preference)
|
aH.Respond(w, preference)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Integrations
|
// RegisterIntegrationRoutes Registers all Integrations
|
||||||
func (ah *APIHandler) RegisterIntegrationRoutes(router *mux.Router, am *AuthMiddleware) {
|
func (aH *APIHandler) RegisterIntegrationRoutes(router *mux.Router, am *AuthMiddleware) {
|
||||||
subRouter := router.PathPrefix("/api/v1/integrations").Subrouter()
|
subRouter := router.PathPrefix("/api/v1/integrations").Subrouter()
|
||||||
|
|
||||||
subRouter.HandleFunc(
|
subRouter.HandleFunc(
|
||||||
"/install", am.ViewAccess(ah.InstallIntegration),
|
"/install", am.ViewAccess(aH.InstallIntegration),
|
||||||
).Methods(http.MethodPost)
|
).Methods(http.MethodPost)
|
||||||
|
|
||||||
subRouter.HandleFunc(
|
subRouter.HandleFunc(
|
||||||
"/uninstall", am.ViewAccess(ah.UninstallIntegration),
|
"/uninstall", am.ViewAccess(aH.UninstallIntegration),
|
||||||
).Methods(http.MethodPost)
|
).Methods(http.MethodPost)
|
||||||
|
|
||||||
// Used for polling for status in v0
|
// Used for polling for status in v0
|
||||||
subRouter.HandleFunc(
|
subRouter.HandleFunc(
|
||||||
"/{integrationId}/connection_status", am.ViewAccess(ah.GetIntegrationConnectionStatus),
|
"/{integrationId}/connection_status", am.ViewAccess(aH.GetIntegrationConnectionStatus),
|
||||||
).Methods(http.MethodGet)
|
).Methods(http.MethodGet)
|
||||||
|
|
||||||
subRouter.HandleFunc(
|
subRouter.HandleFunc(
|
||||||
"/{integrationId}", am.ViewAccess(ah.GetIntegration),
|
"/{integrationId}", am.ViewAccess(aH.GetIntegration),
|
||||||
).Methods(http.MethodGet)
|
).Methods(http.MethodGet)
|
||||||
|
|
||||||
subRouter.HandleFunc(
|
subRouter.HandleFunc(
|
||||||
"", am.ViewAccess(ah.ListIntegrations),
|
"", am.ViewAccess(aH.ListIntegrations),
|
||||||
).Methods(http.MethodGet)
|
).Methods(http.MethodGet)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ah *APIHandler) ListIntegrations(
|
func (aH *APIHandler) ListIntegrations(
|
||||||
w http.ResponseWriter, r *http.Request,
|
w http.ResponseWriter, r *http.Request,
|
||||||
) {
|
) {
|
||||||
params := map[string]string{}
|
params := map[string]string{}
|
||||||
@ -2389,21 +2494,21 @@ func (ah *APIHandler) ListIntegrations(
|
|||||||
params[k] = values[0]
|
params[k] = values[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, apiErr := ah.IntegrationsController.ListIntegrations(
|
resp, apiErr := aH.IntegrationsController.ListIntegrations(
|
||||||
r.Context(), params,
|
r.Context(), params,
|
||||||
)
|
)
|
||||||
if apiErr != nil {
|
if apiErr != nil {
|
||||||
RespondError(w, apiErr, "Failed to fetch integrations")
|
RespondError(w, apiErr, "Failed to fetch integrations")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ah.Respond(w, resp)
|
aH.Respond(w, resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ah *APIHandler) GetIntegration(
|
func (aH *APIHandler) GetIntegration(
|
||||||
w http.ResponseWriter, r *http.Request,
|
w http.ResponseWriter, r *http.Request,
|
||||||
) {
|
) {
|
||||||
integrationId := mux.Vars(r)["integrationId"]
|
integrationId := mux.Vars(r)["integrationId"]
|
||||||
integration, apiErr := ah.IntegrationsController.GetIntegration(
|
integration, apiErr := aH.IntegrationsController.GetIntegration(
|
||||||
r.Context(), integrationId,
|
r.Context(), integrationId,
|
||||||
)
|
)
|
||||||
if apiErr != nil {
|
if apiErr != nil {
|
||||||
@ -2411,14 +2516,14 @@ func (ah *APIHandler) GetIntegration(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ah.Respond(w, integration)
|
aH.Respond(w, integration)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ah *APIHandler) GetIntegrationConnectionStatus(
|
func (aH *APIHandler) GetIntegrationConnectionStatus(
|
||||||
w http.ResponseWriter, r *http.Request,
|
w http.ResponseWriter, r *http.Request,
|
||||||
) {
|
) {
|
||||||
integrationId := mux.Vars(r)["integrationId"]
|
integrationId := mux.Vars(r)["integrationId"]
|
||||||
isInstalled, apiErr := ah.IntegrationsController.IsIntegrationInstalled(
|
isInstalled, apiErr := aH.IntegrationsController.IsIntegrationInstalled(
|
||||||
r.Context(), integrationId,
|
r.Context(), integrationId,
|
||||||
)
|
)
|
||||||
if apiErr != nil {
|
if apiErr != nil {
|
||||||
@ -2428,11 +2533,11 @@ func (ah *APIHandler) GetIntegrationConnectionStatus(
|
|||||||
|
|
||||||
// Do not spend resources calculating connection status unless installed.
|
// Do not spend resources calculating connection status unless installed.
|
||||||
if !isInstalled {
|
if !isInstalled {
|
||||||
ah.Respond(w, &integrations.IntegrationConnectionStatus{})
|
aH.Respond(w, &integrations.IntegrationConnectionStatus{})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
connectionTests, apiErr := ah.IntegrationsController.GetIntegrationConnectionTests(
|
connectionTests, apiErr := aH.IntegrationsController.GetIntegrationConnectionTests(
|
||||||
r.Context(), integrationId,
|
r.Context(), integrationId,
|
||||||
)
|
)
|
||||||
if apiErr != nil {
|
if apiErr != nil {
|
||||||
@ -2446,7 +2551,7 @@ func (ah *APIHandler) GetIntegrationConnectionStatus(
|
|||||||
lookbackSeconds = 15 * 60
|
lookbackSeconds = 15 * 60
|
||||||
}
|
}
|
||||||
|
|
||||||
connectionStatus, apiErr := ah.calculateConnectionStatus(
|
connectionStatus, apiErr := aH.calculateConnectionStatus(
|
||||||
r.Context(), connectionTests, lookbackSeconds,
|
r.Context(), connectionTests, lookbackSeconds,
|
||||||
)
|
)
|
||||||
if apiErr != nil {
|
if apiErr != nil {
|
||||||
@ -2454,10 +2559,10 @@ func (ah *APIHandler) GetIntegrationConnectionStatus(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ah.Respond(w, connectionStatus)
|
aH.Respond(w, connectionStatus)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ah *APIHandler) calculateConnectionStatus(
|
func (aH *APIHandler) calculateConnectionStatus(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
connectionTests *integrations.IntegrationConnectionTests,
|
connectionTests *integrations.IntegrationConnectionTests,
|
||||||
lookbackSeconds int64,
|
lookbackSeconds int64,
|
||||||
@ -2475,7 +2580,7 @@ func (ah *APIHandler) calculateConnectionStatus(
|
|||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
logsConnStatus, apiErr := ah.calculateLogsConnectionStatus(
|
logsConnStatus, apiErr := aH.calculateLogsConnectionStatus(
|
||||||
ctx, connectionTests.Logs, lookbackSeconds,
|
ctx, connectionTests.Logs, lookbackSeconds,
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -2498,7 +2603,7 @@ func (ah *APIHandler) calculateConnectionStatus(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
statusForLastReceivedMetric, apiErr := ah.reader.GetLatestReceivedMetric(
|
statusForLastReceivedMetric, apiErr := aH.reader.GetLatestReceivedMetric(
|
||||||
ctx, connectionTests.Metrics,
|
ctx, connectionTests.Metrics,
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -2542,7 +2647,7 @@ func (ah *APIHandler) calculateConnectionStatus(
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ah *APIHandler) calculateLogsConnectionStatus(
|
func (aH *APIHandler) calculateLogsConnectionStatus(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
logsConnectionTest *integrations.LogsConnectionTest,
|
logsConnectionTest *integrations.LogsConnectionTest,
|
||||||
lookbackSeconds int64,
|
lookbackSeconds int64,
|
||||||
@ -2584,7 +2689,7 @@ func (ah *APIHandler) calculateLogsConnectionStatus(
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
queryRes, _, err := ah.querier.QueryRange(
|
queryRes, _, err := aH.querier.QueryRange(
|
||||||
ctx, qrParams, map[string]v3.AttributeKey{},
|
ctx, qrParams, map[string]v3.AttributeKey{},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -2621,7 +2726,7 @@ func (ah *APIHandler) calculateLogsConnectionStatus(
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ah *APIHandler) InstallIntegration(
|
func (aH *APIHandler) InstallIntegration(
|
||||||
w http.ResponseWriter, r *http.Request,
|
w http.ResponseWriter, r *http.Request,
|
||||||
) {
|
) {
|
||||||
req := integrations.InstallIntegrationRequest{}
|
req := integrations.InstallIntegrationRequest{}
|
||||||
@ -2632,7 +2737,7 @@ func (ah *APIHandler) InstallIntegration(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
integration, apiErr := ah.IntegrationsController.Install(
|
integration, apiErr := aH.IntegrationsController.Install(
|
||||||
r.Context(), &req,
|
r.Context(), &req,
|
||||||
)
|
)
|
||||||
if apiErr != nil {
|
if apiErr != nil {
|
||||||
@ -2640,10 +2745,10 @@ func (ah *APIHandler) InstallIntegration(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ah.Respond(w, integration)
|
aH.Respond(w, integration)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ah *APIHandler) UninstallIntegration(
|
func (aH *APIHandler) UninstallIntegration(
|
||||||
w http.ResponseWriter, r *http.Request,
|
w http.ResponseWriter, r *http.Request,
|
||||||
) {
|
) {
|
||||||
req := integrations.UninstallIntegrationRequest{}
|
req := integrations.UninstallIntegrationRequest{}
|
||||||
@ -2654,13 +2759,13 @@ func (ah *APIHandler) UninstallIntegration(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
apiErr := ah.IntegrationsController.Uninstall(r.Context(), &req)
|
apiErr := aH.IntegrationsController.Uninstall(r.Context(), &req)
|
||||||
if apiErr != nil {
|
if apiErr != nil {
|
||||||
RespondError(w, apiErr, nil)
|
RespondError(w, apiErr, nil)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ah.Respond(w, map[string]interface{}{})
|
aH.Respond(w, map[string]interface{}{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// logs
|
// logs
|
||||||
@ -2807,7 +2912,7 @@ func parseAgentConfigVersion(r *http.Request) (int, *model.ApiError) {
|
|||||||
return int(version64), nil
|
return int(version64), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ah *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) {
|
func (aH *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
req := logparsingpipeline.PipelinesPreviewRequest{}
|
req := logparsingpipeline.PipelinesPreviewRequest{}
|
||||||
|
|
||||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||||
@ -2815,7 +2920,7 @@ func (ah *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
resultLogs, apiErr := ah.LogsParsingPipelineController.PreviewLogsPipelines(
|
resultLogs, apiErr := aH.LogsParsingPipelineController.PreviewLogsPipelines(
|
||||||
r.Context(), &req,
|
r.Context(), &req,
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -2824,10 +2929,10 @@ func (ah *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ah.Respond(w, resultLogs)
|
aH.Respond(w, resultLogs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ah *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) {
|
func (aH *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
version, err := parseAgentConfigVersion(r)
|
version, err := parseAgentConfigVersion(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -2839,20 +2944,20 @@ func (ah *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Re
|
|||||||
var apierr *model.ApiError
|
var apierr *model.ApiError
|
||||||
|
|
||||||
if version != -1 {
|
if version != -1 {
|
||||||
payload, apierr = ah.listLogsPipelinesByVersion(context.Background(), version)
|
payload, apierr = aH.listLogsPipelinesByVersion(context.Background(), version)
|
||||||
} else {
|
} else {
|
||||||
payload, apierr = ah.listLogsPipelines(context.Background())
|
payload, apierr = aH.listLogsPipelines(context.Background())
|
||||||
}
|
}
|
||||||
|
|
||||||
if apierr != nil {
|
if apierr != nil {
|
||||||
RespondError(w, apierr, payload)
|
RespondError(w, apierr, payload)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ah.Respond(w, payload)
|
aH.Respond(w, payload)
|
||||||
}
|
}
|
||||||
|
|
||||||
// listLogsPipelines lists logs piplines for latest version
|
// listLogsPipelines lists logs piplines for latest version
|
||||||
func (ah *APIHandler) listLogsPipelines(ctx context.Context) (
|
func (aH *APIHandler) listLogsPipelines(ctx context.Context) (
|
||||||
*logparsingpipeline.PipelinesResponse, *model.ApiError,
|
*logparsingpipeline.PipelinesResponse, *model.ApiError,
|
||||||
) {
|
) {
|
||||||
// get lateset agent config
|
// get lateset agent config
|
||||||
@ -2866,7 +2971,7 @@ func (ah *APIHandler) listLogsPipelines(ctx context.Context) (
|
|||||||
latestVersion = lastestConfig.Version
|
latestVersion = lastestConfig.Version
|
||||||
}
|
}
|
||||||
|
|
||||||
payload, err := ah.LogsParsingPipelineController.GetPipelinesByVersion(ctx, latestVersion)
|
payload, err := aH.LogsParsingPipelineController.GetPipelinesByVersion(ctx, latestVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, model.WrapApiError(err, "failed to get pipelines")
|
return nil, model.WrapApiError(err, "failed to get pipelines")
|
||||||
}
|
}
|
||||||
@ -2882,10 +2987,10 @@ func (ah *APIHandler) listLogsPipelines(ctx context.Context) (
|
|||||||
}
|
}
|
||||||
|
|
||||||
// listLogsPipelinesByVersion lists pipelines along with config version history
|
// listLogsPipelinesByVersion lists pipelines along with config version history
|
||||||
func (ah *APIHandler) listLogsPipelinesByVersion(ctx context.Context, version int) (
|
func (aH *APIHandler) listLogsPipelinesByVersion(ctx context.Context, version int) (
|
||||||
*logparsingpipeline.PipelinesResponse, *model.ApiError,
|
*logparsingpipeline.PipelinesResponse, *model.ApiError,
|
||||||
) {
|
) {
|
||||||
payload, err := ah.LogsParsingPipelineController.GetPipelinesByVersion(ctx, version)
|
payload, err := aH.LogsParsingPipelineController.GetPipelinesByVersion(ctx, version)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, model.WrapApiError(err, "failed to get pipelines by version")
|
return nil, model.WrapApiError(err, "failed to get pipelines by version")
|
||||||
}
|
}
|
||||||
@ -2901,7 +3006,7 @@ func (ah *APIHandler) listLogsPipelinesByVersion(ctx context.Context, version in
|
|||||||
return payload, nil
|
return payload, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ah *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) {
|
func (aH *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
req := logparsingpipeline.PostablePipelines{}
|
req := logparsingpipeline.PostablePipelines{}
|
||||||
|
|
||||||
@ -2924,7 +3029,7 @@ func (ah *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return ah.LogsParsingPipelineController.ApplyPipelines(ctx, postable)
|
return aH.LogsParsingPipelineController.ApplyPipelines(ctx, postable)
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := createPipeline(r.Context(), req.Pipelines)
|
res, err := createPipeline(r.Context(), req.Pipelines)
|
||||||
@ -2933,7 +3038,7 @@ func (ah *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request)
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ah.Respond(w, res)
|
aH.Respond(w, res)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (aH *APIHandler) getSavedViews(w http.ResponseWriter, r *http.Request) {
|
func (aH *APIHandler) getSavedViews(w http.ResponseWriter, r *http.Request) {
|
||||||
|
@ -0,0 +1,251 @@
|
|||||||
|
## Consumer Lag feature break down
|
||||||
|
|
||||||
|
### 1) Consumer Lag Graph
|
||||||
|
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 2) Consumer Group Details
|
||||||
|
|
||||||
|
API endpoint:
|
||||||
|
|
||||||
|
```
|
||||||
|
POST /api/v1/messaging-queues/kafka/consumer-lag/consumer-details
|
||||||
|
```
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"start": 1720685296000000000,
|
||||||
|
"end": 1721290096000000000,
|
||||||
|
"variables": {
|
||||||
|
"partition": "0",
|
||||||
|
"topic": "topic1"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
response in query range format `series`
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"status": "success",
|
||||||
|
"data": {
|
||||||
|
"resultType": "",
|
||||||
|
"result": [
|
||||||
|
{
|
||||||
|
"queryName": "producer",
|
||||||
|
"series": [
|
||||||
|
{
|
||||||
|
"labels": {
|
||||||
|
"error_rate": "0",
|
||||||
|
"p99_query.p99": "150.08830908000002",
|
||||||
|
"rps": "0.00016534391534391533",
|
||||||
|
"service_name": "producer-svc"
|
||||||
|
},
|
||||||
|
"labelsArray": [
|
||||||
|
{
|
||||||
|
"service_name": "producer-svc"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"p99_query.p99": "150.08830908000002"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"error_rate": "0"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"rps": "0.00016534391534391533"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"values": []
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
response in query range format `table`
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"status": "success",
|
||||||
|
"data": {
|
||||||
|
"resultType": "",
|
||||||
|
"result": [
|
||||||
|
{
|
||||||
|
"table": {
|
||||||
|
"columns": [
|
||||||
|
{
|
||||||
|
"name": "consumer_group",
|
||||||
|
"queryName": "",
|
||||||
|
"isValueColumn": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "service_name",
|
||||||
|
"queryName": "",
|
||||||
|
"isValueColumn": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "p99",
|
||||||
|
"queryName": "",
|
||||||
|
"isValueColumn": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "error_rate",
|
||||||
|
"queryName": "",
|
||||||
|
"isValueColumn": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "throughput",
|
||||||
|
"queryName": "",
|
||||||
|
"isValueColumn": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "avg_msg_size",
|
||||||
|
"queryName": "",
|
||||||
|
"isValueColumn": false
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"rows": [
|
||||||
|
{
|
||||||
|
"data": {
|
||||||
|
"avg_msg_size": "0",
|
||||||
|
"consumer_group": "cg1",
|
||||||
|
"error_rate": "0",
|
||||||
|
"p99": "0.2942205100000016",
|
||||||
|
"service_name": "consumer-svc",
|
||||||
|
"throughput": "0.00016534391534391533"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"data": {
|
||||||
|
"avg_msg_size": "0",
|
||||||
|
"consumer_group": "cg3",
|
||||||
|
"error_rate": "0",
|
||||||
|
"p99": "0.216600410000002",
|
||||||
|
"service_name": "consumer-svc",
|
||||||
|
"throughput": "0.00016534391534391533"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
### 3) Producer Details
|
||||||
|
|
||||||
|
API endpoint:
|
||||||
|
|
||||||
|
```
|
||||||
|
POST /api/v1/messaging-queues/kafka/consumer-lag/consumer-details
|
||||||
|
```
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"start": 1720685296000000000,
|
||||||
|
"end": 1721290096000000000,
|
||||||
|
"variables": {
|
||||||
|
"partition": "0",
|
||||||
|
"topic": "topic1"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
response in query range format `series`
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"status": "success",
|
||||||
|
"data": {
|
||||||
|
"resultType": "",
|
||||||
|
"result": [
|
||||||
|
{
|
||||||
|
"table": {
|
||||||
|
"columns": [
|
||||||
|
{
|
||||||
|
"name": "service_name",
|
||||||
|
"queryName": "",
|
||||||
|
"isValueColumn": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "p99_query.p99",
|
||||||
|
"queryName": "",
|
||||||
|
"isValueColumn": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "error_rate",
|
||||||
|
"queryName": "",
|
||||||
|
"isValueColumn": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "rps",
|
||||||
|
"queryName": "",
|
||||||
|
"isValueColumn": false
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"rows": [
|
||||||
|
{
|
||||||
|
"data": {
|
||||||
|
"error_rate": "0",
|
||||||
|
"p99_query.p99": "150.08830908000002",
|
||||||
|
"rps": "0.00016534391534391533",
|
||||||
|
"service_name": "producer-svc"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
response in query range format `table`
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"status": "success",
|
||||||
|
"data": {
|
||||||
|
"resultType": "",
|
||||||
|
"result": [
|
||||||
|
{
|
||||||
|
"table": {
|
||||||
|
"columns": [
|
||||||
|
{
|
||||||
|
"name": "service_name",
|
||||||
|
"queryName": "",
|
||||||
|
"isValueColumn": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "p99_query.p99",
|
||||||
|
"queryName": "",
|
||||||
|
"isValueColumn": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "error_rate",
|
||||||
|
"queryName": "",
|
||||||
|
"isValueColumn": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "rps",
|
||||||
|
"queryName": "",
|
||||||
|
"isValueColumn": false
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"rows": [
|
||||||
|
{
|
||||||
|
"data": {
|
||||||
|
"error_rate": "0",
|
||||||
|
"p99_query.p99": "150.08830908000002",
|
||||||
|
"rps": "0.00016534391534391533",
|
||||||
|
"service_name": "producer-svc"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
@ -0,0 +1,7 @@
|
|||||||
|
package kafka
|
||||||
|
|
||||||
|
type MessagingQueue struct {
|
||||||
|
Start int64 `json:"start"`
|
||||||
|
End int64 `json:"end"`
|
||||||
|
Variables map[string]string `json:"variables,omitempty"`
|
||||||
|
}
|
165
pkg/query-service/app/integrations/messagingQueues/kafka/sql.go
Normal file
165
pkg/query-service/app/integrations/messagingQueues/kafka/sql.go
Normal file
@ -0,0 +1,165 @@
|
|||||||
|
package kafka
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
func generateConsumerSQL(start, end int64, topic, partition string) string {
|
||||||
|
query := fmt.Sprintf(`
|
||||||
|
WITH
|
||||||
|
-- Sub query for p99 calculation
|
||||||
|
p99_query AS (
|
||||||
|
SELECT
|
||||||
|
stringTagMap['messaging.kafka.consumer.group'] as consumer_group,
|
||||||
|
serviceName,
|
||||||
|
quantile(0.99)(durationNano) / 1000000 as p99
|
||||||
|
FROM signoz_traces.signoz_index_v2
|
||||||
|
WHERE
|
||||||
|
timestamp >= '%d'
|
||||||
|
AND timestamp <= '%d'
|
||||||
|
AND kind = 5
|
||||||
|
AND stringTagMap['messaging.destination.name'] = '%s'
|
||||||
|
AND stringTagMap['messaging.destination.partition.id'] = '%s'
|
||||||
|
GROUP BY consumer_group, serviceName
|
||||||
|
),
|
||||||
|
|
||||||
|
-- Sub query for RPS calculation
|
||||||
|
rps_query AS (
|
||||||
|
SELECT
|
||||||
|
stringTagMap['messaging.kafka.consumer.group'] AS consumer_group,
|
||||||
|
serviceName,
|
||||||
|
count(*) / ((%d - %d) / 1000000000) AS rps -- Convert nanoseconds to seconds
|
||||||
|
FROM signoz_traces.signoz_index_v2
|
||||||
|
WHERE
|
||||||
|
timestamp >= '%d'
|
||||||
|
AND timestamp <= '%d'
|
||||||
|
AND kind = 5
|
||||||
|
AND stringTagMap['messaging.destination.name'] = '%s'
|
||||||
|
AND stringTagMap['messaging.destination.partition.id'] = '%s'
|
||||||
|
GROUP BY consumer_group, serviceName
|
||||||
|
),
|
||||||
|
|
||||||
|
-- Sub query for error rate calculation
|
||||||
|
error_rate_query AS (
|
||||||
|
SELECT
|
||||||
|
stringTagMap['messaging.kafka.consumer.group'] AS consumer_group,
|
||||||
|
serviceName,
|
||||||
|
count(*) / ((%d - %d) / 1000000000) AS error_rate -- Convert nanoseconds to seconds
|
||||||
|
FROM signoz_traces.signoz_index_v2
|
||||||
|
WHERE
|
||||||
|
timestamp >= '%d'
|
||||||
|
AND timestamp <= '%d'
|
||||||
|
AND statusCode = 2
|
||||||
|
AND kind = 5
|
||||||
|
AND stringTagMap['messaging.destination.name'] = '%s'
|
||||||
|
AND stringTagMap['messaging.destination.partition.id'] = '%s'
|
||||||
|
GROUP BY consumer_group, serviceName
|
||||||
|
),
|
||||||
|
|
||||||
|
-- Sub query for average message size calculation
|
||||||
|
avg_msg_size_query AS (
|
||||||
|
SELECT
|
||||||
|
stringTagMap['messaging.kafka.consumer.group'] AS consumer_group,
|
||||||
|
serviceName,
|
||||||
|
avg(numberTagMap['messaging.message.body.size']) AS avg_msg_size
|
||||||
|
FROM signoz_traces.signoz_index_v2
|
||||||
|
WHERE
|
||||||
|
timestamp >= '%d'
|
||||||
|
AND timestamp <= '%d'
|
||||||
|
AND kind = 5
|
||||||
|
AND stringTagMap['messaging.destination.name'] = '%s'
|
||||||
|
AND stringTagMap['messaging.destination.partition.id'] = '%s'
|
||||||
|
GROUP BY consumer_group, serviceName
|
||||||
|
)
|
||||||
|
|
||||||
|
-- Main query to combine all metrics
|
||||||
|
SELECT
|
||||||
|
p99_query.consumer_group AS consumer_group,
|
||||||
|
p99_query.serviceName AS service_name,
|
||||||
|
p99_query.p99 AS p99,
|
||||||
|
COALESCE(error_rate_query.error_rate, 0) AS error_rate,
|
||||||
|
COALESCE(rps_query.rps, 0) AS throughput,
|
||||||
|
COALESCE(avg_msg_size_query.avg_msg_size, 0) AS avg_msg_size
|
||||||
|
FROM
|
||||||
|
p99_query
|
||||||
|
LEFT JOIN rps_query ON p99_query.consumer_group = rps_query.consumer_group
|
||||||
|
AND p99_query.serviceName = rps_query.serviceName
|
||||||
|
LEFT JOIN error_rate_query ON p99_query.consumer_group = error_rate_query.consumer_group
|
||||||
|
AND p99_query.serviceName = error_rate_query.serviceName
|
||||||
|
LEFT JOIN avg_msg_size_query ON p99_query.consumer_group = avg_msg_size_query.consumer_group
|
||||||
|
AND p99_query.serviceName = avg_msg_size_query.serviceName
|
||||||
|
ORDER BY
|
||||||
|
p99_query.consumer_group;
|
||||||
|
`, start, end, topic, partition, end, start, start, end, topic, partition, end, start, start, end, topic, partition, end, start, topic, partition)
|
||||||
|
return query
|
||||||
|
}
|
||||||
|
|
||||||
|
func generateProducerSQL(start, end int64, topic, partition string) string {
|
||||||
|
query := fmt.Sprintf(`
|
||||||
|
|
||||||
|
-- producer
|
||||||
|
WITH
|
||||||
|
-- Subquery for p99 calculation
|
||||||
|
p99_query AS (
|
||||||
|
SELECT
|
||||||
|
serviceName,
|
||||||
|
quantile(0.99)(durationNano) / 1000000 as p99
|
||||||
|
FROM signoz_traces.signoz_index_v2
|
||||||
|
WHERE
|
||||||
|
timestamp >= '%d'
|
||||||
|
AND timestamp <= '%d'
|
||||||
|
AND kind = 4
|
||||||
|
AND stringTagMap['messaging.destination.name'] = '%s'
|
||||||
|
AND stringTagMap['messaging.destination.partition.id'] = '%s'
|
||||||
|
GROUP BY serviceName
|
||||||
|
),
|
||||||
|
|
||||||
|
-- Subquery for RPS calculation
|
||||||
|
rps_query AS (
|
||||||
|
SELECT
|
||||||
|
serviceName,
|
||||||
|
count(*) / ((%d - %d) / 1000000000) as rps -- Convert nanoseconds to seconds
|
||||||
|
FROM signoz_traces.signoz_index_v2
|
||||||
|
WHERE
|
||||||
|
timestamp >= '%d'
|
||||||
|
AND timestamp <= '%d'
|
||||||
|
AND kind = 4
|
||||||
|
AND stringTagMap['messaging.destination.name'] = '%s'
|
||||||
|
AND stringTagMap['messaging.destination.partition.id'] = '%s'
|
||||||
|
GROUP BY serviceName
|
||||||
|
),
|
||||||
|
|
||||||
|
-- Subquery for error rate calculation
|
||||||
|
error_rate_query AS (
|
||||||
|
SELECT
|
||||||
|
serviceName,
|
||||||
|
count(*) / ((%d - %d) / 1000000000) as error_rate -- Convert nanoseconds to seconds
|
||||||
|
FROM signoz_traces.signoz_index_v2
|
||||||
|
WHERE
|
||||||
|
timestamp >= '%d'
|
||||||
|
AND timestamp <= '%d'
|
||||||
|
AND statusCode = 2
|
||||||
|
AND kind = 4
|
||||||
|
AND stringTagMap['messaging.destination.name'] = '%s'
|
||||||
|
AND stringTagMap['messaging.destination.partition.id'] = '%s'
|
||||||
|
GROUP BY serviceName
|
||||||
|
)
|
||||||
|
|
||||||
|
-- Main query to combine all metrics
|
||||||
|
SELECT
|
||||||
|
p99_query.serviceName AS service_name,
|
||||||
|
p99_query.p99,
|
||||||
|
COALESCE(error_rate_query.error_rate, 0) AS error_rate,
|
||||||
|
COALESCE(rps_query.rps, 0) AS rps
|
||||||
|
FROM
|
||||||
|
p99_query
|
||||||
|
LEFT JOIN
|
||||||
|
rps_query ON p99_query.serviceName = rps_query.serviceName
|
||||||
|
LEFT JOIN
|
||||||
|
error_rate_query ON p99_query.serviceName = error_rate_query.serviceName
|
||||||
|
ORDER BY
|
||||||
|
p99_query.serviceName;
|
||||||
|
|
||||||
|
`, start, end, topic, partition, end, start, start, end, topic, partition, end, start, start, end, topic, partition)
|
||||||
|
return query
|
||||||
|
}
|
@ -0,0 +1,101 @@
|
|||||||
|
package kafka
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||||
|
)
|
||||||
|
|
||||||
|
var defaultStepInterval int64 = 60
|
||||||
|
|
||||||
|
func BuildQueryRangeParams(messagingQueue *MessagingQueue, queryContext string) (*v3.QueryRangeParamsV3, error) {
|
||||||
|
|
||||||
|
var cq *v3.CompositeQuery
|
||||||
|
if queryContext == "producer" {
|
||||||
|
chq, err := buildProducerClickHouseQuery(messagingQueue)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
cq, err = buildCompositeQueryProducer(chq)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
} else if queryContext == "consumer" {
|
||||||
|
chq, err := buildConsumerClickHouseQuery(messagingQueue)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
cq, err = buildCompositeQueryConsumer(chq)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
queryRangeParams := &v3.QueryRangeParamsV3{
|
||||||
|
Start: messagingQueue.Start,
|
||||||
|
End: messagingQueue.End,
|
||||||
|
Step: defaultStepInterval,
|
||||||
|
CompositeQuery: cq,
|
||||||
|
Version: "v4",
|
||||||
|
FormatForWeb: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
return queryRangeParams, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildProducerClickHouseQuery(messagingQueue *MessagingQueue) (*v3.ClickHouseQuery, error) {
|
||||||
|
start := messagingQueue.Start
|
||||||
|
end := messagingQueue.End
|
||||||
|
topic, ok := messagingQueue.Variables["topic"]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("invalid type for Topic")
|
||||||
|
}
|
||||||
|
|
||||||
|
partition, ok := messagingQueue.Variables["partition"]
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("invalid type for Partition")
|
||||||
|
}
|
||||||
|
query := generateProducerSQL(start, end, topic, partition)
|
||||||
|
|
||||||
|
return &v3.ClickHouseQuery{
|
||||||
|
Query: query,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildConsumerClickHouseQuery(messagingQueue *MessagingQueue) (*v3.ClickHouseQuery, error) {
|
||||||
|
start := messagingQueue.Start
|
||||||
|
end := messagingQueue.End
|
||||||
|
topic, ok := messagingQueue.Variables["topic"]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("invalid type for Topic")
|
||||||
|
}
|
||||||
|
|
||||||
|
partition, ok := messagingQueue.Variables["partition"]
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("invalid type for Partition")
|
||||||
|
}
|
||||||
|
query := generateConsumerSQL(start, end, topic, partition)
|
||||||
|
|
||||||
|
return &v3.ClickHouseQuery{
|
||||||
|
Query: query,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildCompositeQueryProducer(chq *v3.ClickHouseQuery) (*v3.CompositeQuery, error) {
|
||||||
|
return &v3.CompositeQuery{
|
||||||
|
QueryType: v3.QueryTypeClickHouseSQL,
|
||||||
|
ClickHouseQueries: map[string]*v3.ClickHouseQuery{"producer": chq},
|
||||||
|
PanelType: v3.PanelTypeTable,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildCompositeQueryConsumer(chq *v3.ClickHouseQuery) (*v3.CompositeQuery, error) {
|
||||||
|
return &v3.CompositeQuery{
|
||||||
|
QueryType: v3.QueryTypeClickHouseSQL,
|
||||||
|
ClickHouseQueries: map[string]*v3.ClickHouseQuery{"consumer": chq},
|
||||||
|
PanelType: v3.PanelTypeTable,
|
||||||
|
}, nil
|
||||||
|
}
|
10
pkg/query-service/app/integrations/messagingQueues/readme.md
Normal file
10
pkg/query-service/app/integrations/messagingQueues/readme.md
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
## Integreation: Messaging Queue
|
||||||
|
|
||||||
|
This package contains the `api`, and `translation` logic to support messaging queue features.
|
||||||
|
|
||||||
|
Currently supported queues:
|
||||||
|
1) Kafka
|
||||||
|
|
||||||
|
For detailed setup, checkout our public docs for configuring:
|
||||||
|
1) Trace collection form Clients (Producer and Consumer)
|
||||||
|
2) Metrics collection from Kafka Brokers, Producers and Consumers
|
@ -295,6 +295,7 @@ func (s *Server) createPublicServer(api *APIHandler) (*http.Server, error) {
|
|||||||
api.RegisterIntegrationRoutes(r, am)
|
api.RegisterIntegrationRoutes(r, am)
|
||||||
api.RegisterQueryRangeV3Routes(r, am)
|
api.RegisterQueryRangeV3Routes(r, am)
|
||||||
api.RegisterQueryRangeV4Routes(r, am)
|
api.RegisterQueryRangeV4Routes(r, am)
|
||||||
|
api.RegisterMessagingQueuesRoutes(r, am)
|
||||||
|
|
||||||
c := cors.New(cors.Options{
|
c := cors.New(cors.Options{
|
||||||
AllowedOrigins: []string{"*"},
|
AllowedOrigins: []string{"*"},
|
||||||
|
Loading…
x
Reference in New Issue
Block a user