From 407654e68e7c0bec06de7499d94aa28464a2fa62 Mon Sep 17 00:00:00 2001 From: Shivanshu Raj Shrivastava Date: Wed, 19 Feb 2025 16:08:58 +0530 Subject: [PATCH] feat: add APIs for third party api feat (#7005) * feat: add APIs for third party api feat Signed-off-by: Shivanshu Raj Shrivastava * fix: minor fixes Signed-off-by: Shivanshu Raj Shrivastava * test: add unit tests Signed-off-by: Shivanshu Raj Shrivastava * chore: minor changes Signed-off-by: Shivanshu Raj Shrivastava * fix: review comments Signed-off-by: Shivanshu Raj Shrivastava * test: add unit tests Signed-off-by: Shivanshu Raj Shrivastava * chore: cleanup Signed-off-by: Shivanshu Raj Shrivastava * chore: review comments Signed-off-by: Shivanshu Raj Shrivastava * chore: review comments Signed-off-by: Shivanshu Raj Shrivastava --------- Signed-off-by: Shivanshu Raj Shrivastava --- ee/query-service/app/server.go | 1 + pkg/query-service/app/http_handler.go | 116 ++++-- .../messagingQueues/celery/translator.go | 1 - .../app/integrations/thirdPartyApi/model.go | 13 + .../integrations/thirdPartyApi/translator.go | 337 ++++++++++++++++++ .../thirdPartyApi/translator_test.go | 267 ++++++++++++++ pkg/query-service/app/parser.go | 30 ++ pkg/query-service/app/server.go | 1 + .../app/traces/v3/query_builder.go | 3 + .../app/traces/v4/query_builder.go | 3 + .../app/traces/v4/query_builder_test.go | 15 + 11 files changed, 751 insertions(+), 36 deletions(-) delete mode 100644 pkg/query-service/app/integrations/messagingQueues/celery/translator.go create mode 100644 pkg/query-service/app/integrations/thirdPartyApi/model.go create mode 100644 pkg/query-service/app/integrations/thirdPartyApi/translator.go create mode 100644 pkg/query-service/app/integrations/thirdPartyApi/translator_test.go diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index 99bb42df70..740278c40b 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -374,6 +374,7 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler, web web.Web) (*h apiHandler.RegisterQueryRangeV4Routes(r, am) apiHandler.RegisterWebSocketPaths(r, am) apiHandler.RegisterMessagingQueuesRoutes(r, am) + apiHandler.RegisterThirdPartyApiRoutes(r, am) c := cors.New(cors.Options{ AllowedOrigins: []string{"*"}, diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index bc5593284c..0527886850 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -31,6 +31,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/app/inframetrics" "go.signoz.io/signoz/pkg/query-service/app/integrations" queues2 "go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/queues" + "go.signoz.io/signoz/pkg/query-service/app/integrations/thirdPartyApi" "go.signoz.io/signoz/pkg/query-service/app/logs" logsv3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" logsv4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4" @@ -2600,21 +2601,19 @@ func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *Auth spanEvaluation := kafkaRouter.PathPrefix("/span").Subrouter() spanEvaluation.HandleFunc("/evaluation", am.ViewAccess(aH.getProducerConsumerEval)).Methods(http.MethodPost) +} - // ------------------------------------------------- - // Celery-specific routes - celeryRouter := messagingQueuesRouter.PathPrefix("/celery").Subrouter() +// RegisterThirdPartyApiRoutes adds third-party-api integration routes +func (aH *APIHandler) RegisterThirdPartyApiRoutes(router *mux.Router, am *AuthMiddleware) { - // Celery overview routes - celeryRouter.HandleFunc("/overview", am.ViewAccess(aH.getCeleryOverview)).Methods(http.MethodPost) + // Main messaging queues router + thirdPartyApiRouter := router.PathPrefix("/api/v1/third-party-apis").Subrouter() - // Celery tasks routes - celeryRouter.HandleFunc("/tasks", am.ViewAccess(aH.getCeleryTasks)).Methods(http.MethodPost) + // Domain Overview route + overviewRouter := thirdPartyApiRouter.PathPrefix("/overview").Subrouter() - // Celery performance routes - celeryRouter.HandleFunc("/performance", am.ViewAccess(aH.getCeleryPerformance)).Methods(http.MethodPost) - - // for other messaging queues, add SubRouters here + overviewRouter.HandleFunc("/list", am.ViewAccess(aH.getDomainList)).Methods(http.MethodPost) + overviewRouter.HandleFunc("/domain", am.ViewAccess(aH.getDomainInfo)).Methods(http.MethodPost) } // not using md5 hashing as the plain string would work @@ -3493,24 +3492,6 @@ func (aH *APIHandler) getProducerConsumerEval( aH.Respond(w, resp) } -// ParseKafkaQueueBody parse for messaging queue params -func ParseKafkaQueueBody(r *http.Request) (*kafka.MessagingQueue, *model.ApiError) { - messagingQueue := new(kafka.MessagingQueue) - if err := json.NewDecoder(r.Body).Decode(messagingQueue); err != nil { - return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)} - } - return messagingQueue, nil -} - -// ParseQueueBody parses for any queue -func ParseQueueBody(r *http.Request) (*queues2.QueueListRequest, *model.ApiError) { - queue := new(queues2.QueueListRequest) - if err := json.NewDecoder(r.Body).Decode(queue); err != nil { - return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)} - } - return queue, nil -} - // Preferences func (aH *APIHandler) getUserPreference( @@ -5515,13 +5496,78 @@ func (aH *APIHandler) getQueueOverview(w http.ResponseWriter, r *http.Request) { aH.Respond(w, results) } -func (aH *APIHandler) getCeleryOverview(w http.ResponseWriter, r *http.Request) { +func (aH *APIHandler) getDomainList(w http.ResponseWriter, r *http.Request) { + thirdPartyQueryRequest, apiErr := ParseRequestBody(r) + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + queryRangeParams, err := thirdPartyApi.BuildDomainList(thirdPartyQueryRequest) + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + var result []*v3.Result + var errQuriesByName map[string]error + + result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + + result = postprocess.TransformToTableForBuilderQueries(result, queryRangeParams) + + if !thirdPartyQueryRequest.ShowIp { + result = thirdPartyApi.FilterResponse(result) + } + + resp := v3.QueryRangeResponse{ + Result: result, + } + aH.Respond(w, resp) } -func (aH *APIHandler) getCeleryTasks(w http.ResponseWriter, r *http.Request) { - // TODO: Implement celery tasks logic for both state and list types -} +func (aH *APIHandler) getDomainInfo(w http.ResponseWriter, r *http.Request) { + thirdPartyQueryRequest, apiErr := ParseRequestBody(r) -func (aH *APIHandler) getCeleryPerformance(w http.ResponseWriter, r *http.Request) { - // TODO: Implement celery performance logic for error, rate, and latency types + if apiErr != nil { + zap.L().Error(apiErr.Err.Error()) + RespondError(w, apiErr, nil) + return + } + + queryRangeParams, err := thirdPartyApi.BuildDomainInfo(thirdPartyQueryRequest) + + if err := validateQueryRangeParamsV3(queryRangeParams); err != nil { + zap.L().Error(err.Error()) + RespondError(w, apiErr, nil) + return + } + + var result []*v3.Result + var errQuriesByName map[string]error + + result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErrObj, errQuriesByName) + return + } + + result = postprocess.TransformToTableForBuilderQueries(result, queryRangeParams) + + if !thirdPartyQueryRequest.ShowIp { + result = thirdPartyApi.FilterResponse(result) + } + + resp := v3.QueryRangeResponse{ + Result: result, + } + aH.Respond(w, resp) } diff --git a/pkg/query-service/app/integrations/messagingQueues/celery/translator.go b/pkg/query-service/app/integrations/messagingQueues/celery/translator.go deleted file mode 100644 index bf7e67e150..0000000000 --- a/pkg/query-service/app/integrations/messagingQueues/celery/translator.go +++ /dev/null @@ -1 +0,0 @@ -package celery diff --git a/pkg/query-service/app/integrations/thirdPartyApi/model.go b/pkg/query-service/app/integrations/thirdPartyApi/model.go new file mode 100644 index 0000000000..7ea5fc86d8 --- /dev/null +++ b/pkg/query-service/app/integrations/thirdPartyApi/model.go @@ -0,0 +1,13 @@ +package thirdPartyApi + +import v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + +type ThirdPartyApis struct { + Start int64 `json:"start"` + End int64 `json:"end"` + ShowIp bool `json:"show_ip,omitempty"` + Domain int64 `json:"domain,omitempty"` + Endpoint string `json:"endpoint,omitempty"` + Filters v3.FilterSet `json:"filters,omitempty"` + GroupBy []v3.AttributeKey `json:"groupBy,omitempty"` +} diff --git a/pkg/query-service/app/integrations/thirdPartyApi/translator.go b/pkg/query-service/app/integrations/thirdPartyApi/translator.go new file mode 100644 index 0000000000..f0e66552fe --- /dev/null +++ b/pkg/query-service/app/integrations/thirdPartyApi/translator.go @@ -0,0 +1,337 @@ +package thirdPartyApi + +import ( + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "net" +) + +var defaultStepInterval int64 = 60 + +func FilterResponse(results []*v3.Result) []*v3.Result { + filteredResults := make([]*v3.Result, 0, len(results)) + + for _, res := range results { + if res.Table == nil { + continue + } + filteredRows := make([]*v3.TableRow, 0, len(res.Table.Rows)) + for _, row := range res.Table.Rows { + if row.Data != nil { + if domainVal, ok := row.Data["net.peer.name"]; ok { + if domainStr, ok := domainVal.(string); ok { + if net.ParseIP(domainStr) != nil { + continue + } + } + } + } + filteredRows = append(filteredRows, row) + } + res.Table.Rows = filteredRows + + filteredResults = append(filteredResults, res) + } + + return filteredResults +} + +func getFilterSet(existingFilters []v3.FilterItem, apiFilters v3.FilterSet) []v3.FilterItem { + if len(apiFilters.Items) != 0 { + existingFilters = append(existingFilters, apiFilters.Items...) + } + return existingFilters +} + +func getGroupBy(existingGroupBy []v3.AttributeKey, apiGroupBy []v3.AttributeKey) []v3.AttributeKey { + if len(apiGroupBy) != 0 { + existingGroupBy = append(existingGroupBy, apiGroupBy...) + } + return existingGroupBy +} + +func BuildDomainList(thirdPartyApis *ThirdPartyApis) (*v3.QueryRangeParamsV3, error) { + + unixMilliStart := thirdPartyApis.Start + unixMilliEnd := thirdPartyApis.End + + builderQueries := make(map[string]*v3.BuilderQuery) + + builderQueries["endpoints"] = &v3.BuilderQuery{ + QueryName: "endpoints", + DataSource: v3.DataSourceTraces, + StepInterval: defaultStepInterval, + AggregateOperator: v3.AggregateOperatorCountDistinct, + AggregateAttribute: v3.AttributeKey{ + Key: "http.url", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + TimeAggregation: v3.TimeAggregationCountDistinct, + SpaceAggregation: v3.SpaceAggregationSum, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: getFilterSet([]v3.FilterItem{}, thirdPartyApis.Filters), + }, + Expression: "endpoints", + GroupBy: getGroupBy([]v3.AttributeKey{ + { + Key: "net.peer.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + }, thirdPartyApis.GroupBy), + ReduceTo: v3.ReduceToOperatorAvg, + } + + builderQueries["lastseen"] = &v3.BuilderQuery{ + QueryName: "lastseen", + DataSource: v3.DataSourceTraces, + StepInterval: defaultStepInterval, + AggregateOperator: v3.AggregateOperatorMax, + AggregateAttribute: v3.AttributeKey{ + Key: "timestamp", + }, + TimeAggregation: v3.TimeAggregationMax, + SpaceAggregation: v3.SpaceAggregationSum, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: getFilterSet([]v3.FilterItem{}, thirdPartyApis.Filters), + }, + Expression: "lastseen", + GroupBy: getGroupBy([]v3.AttributeKey{ + { + Key: "net.peer.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + }, thirdPartyApis.GroupBy), + ReduceTo: v3.ReduceToOperatorAvg, + } + + builderQueries["rps"] = &v3.BuilderQuery{ + QueryName: "rps", + DataSource: v3.DataSourceTraces, + StepInterval: defaultStepInterval, + AggregateOperator: v3.AggregateOperatorRate, + AggregateAttribute: v3.AttributeKey{ + Key: "", + }, + TimeAggregation: v3.TimeAggregationRate, + SpaceAggregation: v3.SpaceAggregationSum, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: getFilterSet([]v3.FilterItem{}, thirdPartyApis.Filters), + }, + Expression: "rps", + GroupBy: getGroupBy([]v3.AttributeKey{ + { + Key: "net.peer.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + }, thirdPartyApis.GroupBy), + ReduceTo: v3.ReduceToOperatorAvg, + } + + builderQueries["error_rate"] = &v3.BuilderQuery{ + QueryName: "error_rate", + DataSource: v3.DataSourceTraces, + StepInterval: defaultStepInterval, + AggregateOperator: v3.AggregateOperatorRate, + AggregateAttribute: v3.AttributeKey{ + Key: "", + }, + TimeAggregation: v3.TimeAggregationRate, + SpaceAggregation: v3.SpaceAggregationSum, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: getFilterSet([]v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "has_error", + DataType: v3.AttributeKeyDataTypeBool, + IsColumn: true, + }, + Operator: "=", + Value: "true", + }, + }, thirdPartyApis.Filters), + }, + Expression: "error_rate", + GroupBy: getGroupBy([]v3.AttributeKey{ + { + Key: "net.peer.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + }, thirdPartyApis.GroupBy), + ReduceTo: v3.ReduceToOperatorAvg, + } + + builderQueries["p99"] = &v3.BuilderQuery{ + QueryName: "p99", + DataSource: v3.DataSourceTraces, + StepInterval: defaultStepInterval, + AggregateOperator: v3.AggregateOperatorP99, + AggregateAttribute: v3.AttributeKey{ + Key: "duration_nano", + DataType: v3.AttributeKeyDataTypeFloat64, + IsColumn: true, + }, + TimeAggregation: "p99", + SpaceAggregation: v3.SpaceAggregationSum, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: getFilterSet([]v3.FilterItem{}, thirdPartyApis.Filters), + }, + Expression: "p99", + GroupBy: getGroupBy([]v3.AttributeKey{ + { + Key: "net.peer.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + }, thirdPartyApis.GroupBy), + ReduceTo: v3.ReduceToOperatorAvg, + } + + compositeQuery := &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeTable, + FillGaps: false, + BuilderQueries: builderQueries, + } + + queryRangeParams := &v3.QueryRangeParamsV3{ + Start: unixMilliStart, + End: unixMilliEnd, + Step: defaultStepInterval, + CompositeQuery: compositeQuery, + Version: "v4", + FormatForWeb: true, + } + + return queryRangeParams, nil +} + +func BuildDomainInfo(thirdPartyApis *ThirdPartyApis) (*v3.QueryRangeParamsV3, error) { + unixMilliStart := thirdPartyApis.Start + unixMilliEnd := thirdPartyApis.End + + builderQueries := make(map[string]*v3.BuilderQuery) + + builderQueries["endpoints"] = &v3.BuilderQuery{ + QueryName: "endpoints", + DataSource: v3.DataSourceTraces, + StepInterval: defaultStepInterval, + AggregateOperator: v3.AggregateOperatorCount, + AggregateAttribute: v3.AttributeKey{ + Key: "http.url", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + TimeAggregation: v3.TimeAggregationRate, + SpaceAggregation: v3.SpaceAggregationSum, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: getFilterSet([]v3.FilterItem{}, thirdPartyApis.Filters), + }, + Expression: "endpoints", + Disabled: false, + GroupBy: getGroupBy([]v3.AttributeKey{ + { + Key: "http.url", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + }, thirdPartyApis.GroupBy), + Legend: "", + ReduceTo: v3.ReduceToOperatorAvg, + } + + builderQueries["p99"] = &v3.BuilderQuery{ + QueryName: "p99", + DataSource: v3.DataSourceTraces, + StepInterval: defaultStepInterval, + AggregateOperator: v3.AggregateOperatorP99, + AggregateAttribute: v3.AttributeKey{ + Key: "duration_nano", + DataType: v3.AttributeKeyDataTypeFloat64, + IsColumn: true, + }, + TimeAggregation: "p99", + SpaceAggregation: v3.SpaceAggregationSum, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: getFilterSet([]v3.FilterItem{}, thirdPartyApis.Filters), + }, + Expression: "p99", + Disabled: false, + Having: nil, + GroupBy: getGroupBy([]v3.AttributeKey{}, thirdPartyApis.GroupBy), + Legend: "", + ReduceTo: v3.ReduceToOperatorAvg, + } + + builderQueries["error_rate"] = &v3.BuilderQuery{ + QueryName: "error_rate", + DataSource: v3.DataSourceTraces, + StepInterval: defaultStepInterval, + AggregateOperator: v3.AggregateOperatorRate, + AggregateAttribute: v3.AttributeKey{ + Key: "", + }, + TimeAggregation: v3.TimeAggregationRate, + SpaceAggregation: v3.SpaceAggregationSum, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: getFilterSet([]v3.FilterItem{}, thirdPartyApis.Filters), + }, + Expression: "error_rate", + Disabled: false, + GroupBy: getGroupBy([]v3.AttributeKey{}, thirdPartyApis.GroupBy), + Legend: "", + ReduceTo: v3.ReduceToOperatorAvg, + } + + builderQueries["lastseen"] = &v3.BuilderQuery{ + QueryName: "lastseen", + DataSource: v3.DataSourceTraces, + StepInterval: defaultStepInterval, + AggregateOperator: v3.AggregateOperatorMax, + AggregateAttribute: v3.AttributeKey{ + Key: "timestamp", + }, + TimeAggregation: v3.TimeAggregationMax, + SpaceAggregation: v3.SpaceAggregationSum, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: getFilterSet([]v3.FilterItem{}, thirdPartyApis.Filters), + }, + Expression: "lastseen", + Disabled: false, + Having: nil, + OrderBy: nil, + GroupBy: getGroupBy([]v3.AttributeKey{}, thirdPartyApis.GroupBy), + Legend: "", + ReduceTo: v3.ReduceToOperatorAvg, + } + + compositeQuery := &v3.CompositeQuery{ + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeTable, + FillGaps: false, + BuilderQueries: builderQueries, + } + + queryRangeParams := &v3.QueryRangeParamsV3{ + Start: unixMilliStart, + End: unixMilliEnd, + Step: defaultStepInterval, + CompositeQuery: compositeQuery, + Version: "v4", + FormatForWeb: true, + } + + return queryRangeParams, nil +} diff --git a/pkg/query-service/app/integrations/thirdPartyApi/translator_test.go b/pkg/query-service/app/integrations/thirdPartyApi/translator_test.go new file mode 100644 index 0000000000..188af606bc --- /dev/null +++ b/pkg/query-service/app/integrations/thirdPartyApi/translator_test.go @@ -0,0 +1,267 @@ +package thirdPartyApi + +import ( + "testing" + + "github.com/stretchr/testify/assert" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +func TestFilterResponse(t *testing.T) { + tests := []struct { + name string + input []*v3.Result + expected []*v3.Result + }{ + { + name: "should filter out IP addresses from net.peer.name", + input: []*v3.Result{ + { + Table: &v3.Table{ + Rows: []*v3.TableRow{ + { + Data: map[string]interface{}{ + "net.peer.name": "192.168.1.1", + }, + }, + { + Data: map[string]interface{}{ + "net.peer.name": "example.com", + }, + }, + }, + }, + }, + }, + expected: []*v3.Result{ + { + Table: &v3.Table{ + Rows: []*v3.TableRow{ + { + Data: map[string]interface{}{ + "net.peer.name": "example.com", + }, + }, + }, + }, + }, + }, + }, + { + name: "should handle nil data", + input: []*v3.Result{ + { + Table: &v3.Table{ + Rows: []*v3.TableRow{ + { + Data: nil, + }, + }, + }, + }, + }, + expected: []*v3.Result{ + { + Table: &v3.Table{ + Rows: []*v3.TableRow{ + { + Data: nil, + }, + }, + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := FilterResponse(tt.input) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestGetFilterSet(t *testing.T) { + tests := []struct { + name string + existingFilters []v3.FilterItem + apiFilters v3.FilterSet + expected []v3.FilterItem + }{ + { + name: "should append new filters", + existingFilters: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "existing"}, + }, + }, + apiFilters: v3.FilterSet{ + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "new"}, + }, + }, + }, + expected: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "existing"}, + }, + { + Key: v3.AttributeKey{Key: "new"}, + }, + }, + }, + { + name: "should handle empty api filters", + existingFilters: []v3.FilterItem{{Key: v3.AttributeKey{Key: "existing"}}}, + apiFilters: v3.FilterSet{}, + expected: []v3.FilterItem{{Key: v3.AttributeKey{Key: "existing"}}}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := getFilterSet(tt.existingFilters, tt.apiFilters) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestGetGroupBy(t *testing.T) { + tests := []struct { + name string + existingGroup []v3.AttributeKey + apiGroup []v3.AttributeKey + expected []v3.AttributeKey + }{ + { + name: "should append new group by attributes", + existingGroup: []v3.AttributeKey{ + {Key: "existing"}, + }, + apiGroup: []v3.AttributeKey{ + {Key: "new"}, + }, + expected: []v3.AttributeKey{ + {Key: "existing"}, + {Key: "new"}, + }, + }, + { + name: "should handle empty api group", + existingGroup: []v3.AttributeKey{{Key: "existing"}}, + apiGroup: []v3.AttributeKey{}, + expected: []v3.AttributeKey{{Key: "existing"}}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := getGroupBy(tt.existingGroup, tt.apiGroup) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestBuildDomainList(t *testing.T) { + tests := []struct { + name string + input *ThirdPartyApis + wantErr bool + }{ + { + name: "basic domain list query", + input: &ThirdPartyApis{ + Start: 1000, + End: 2000, + }, + wantErr: false, + }, + { + name: "with filters and group by", + input: &ThirdPartyApis{ + Start: 1000, + End: 2000, + Filters: v3.FilterSet{ + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "test"}, + }, + }, + }, + GroupBy: []v3.AttributeKey{ + {Key: "test"}, + }, + }, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := BuildDomainList(tt.input) + if tt.wantErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + assert.NotNil(t, result) + assert.Equal(t, tt.input.Start, result.Start) + assert.Equal(t, tt.input.End, result.End) + assert.NotNil(t, result.CompositeQuery) + assert.NotNil(t, result.CompositeQuery.BuilderQueries) + }) + } +} + +func TestBuildDomainInfo(t *testing.T) { + tests := []struct { + name string + input *ThirdPartyApis + wantErr bool + }{ + { + name: "basic domain info query", + input: &ThirdPartyApis{ + Start: 1000, + End: 2000, + }, + wantErr: false, + }, + { + name: "with filters and group by", + input: &ThirdPartyApis{ + Start: 1000, + End: 2000, + Filters: v3.FilterSet{ + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{Key: "test"}, + }, + }, + }, + GroupBy: []v3.AttributeKey{ + {Key: "test"}, + }, + }, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := BuildDomainInfo(tt.input) + if tt.wantErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + assert.NotNil(t, result) + assert.Equal(t, tt.input.Start, result.Start) + assert.Equal(t, tt.input.End, result.End) + assert.NotNil(t, result.CompositeQuery) + assert.NotNil(t, result.CompositeQuery.BuilderQueries) + }) + } +} diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index 8cfc621662..d1b6f2f23f 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -6,6 +6,9 @@ import ( "encoding/json" "errors" "fmt" + "go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/kafka" + queues2 "go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/queues" + "go.signoz.io/signoz/pkg/query-service/app/integrations/thirdPartyApi" "math" "net/http" "strconv" @@ -1007,3 +1010,30 @@ func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiE return queryRangeParams, nil } + +// ParseKafkaQueueBody parse for messaging queue params +func ParseKafkaQueueBody(r *http.Request) (*kafka.MessagingQueue, *model.ApiError) { + messagingQueue := new(kafka.MessagingQueue) + if err := json.NewDecoder(r.Body).Decode(messagingQueue); err != nil { + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)} + } + return messagingQueue, nil +} + +// ParseQueueBody parses for any queue +func ParseQueueBody(r *http.Request) (*queues2.QueueListRequest, *model.ApiError) { + queue := new(queues2.QueueListRequest) + if err := json.NewDecoder(r.Body).Decode(queue); err != nil { + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)} + } + return queue, nil +} + +// ParseRequestBody for third party APIs +func ParseRequestBody(r *http.Request) (*thirdPartyApi.ThirdPartyApis, *model.ApiError) { + thirdPartApis := new(thirdPartyApi.ThirdPartyApis) + if err := json.NewDecoder(r.Body).Decode(thirdPartApis); err != nil { + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)} + } + return thirdPartApis, nil +} diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 3101bf9306..a394a1045b 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -315,6 +315,7 @@ func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server, api.RegisterWebSocketPaths(r, am) api.RegisterQueryRangeV4Routes(r, am) api.RegisterMessagingQueuesRoutes(r, am) + api.RegisterThirdPartyApiRoutes(r, am) c := cors.New(cors.Options{ AllowedOrigins: []string{"*"}, diff --git a/pkg/query-service/app/traces/v3/query_builder.go b/pkg/query-service/app/traces/v3/query_builder.go index 067a53c08b..1e1461c7dd 100644 --- a/pkg/query-service/app/traces/v3/query_builder.go +++ b/pkg/query-service/app/traces/v3/query_builder.go @@ -297,6 +297,9 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, _ string, pan aggregationKey := "" if mq.AggregateAttribute.Key != "" { aggregationKey = getColumnName(mq.AggregateAttribute) + if mq.AggregateAttribute.Key == "timestamp" { + aggregationKey = "toUnixTimestamp64Nano(timestamp)" + } } switch mq.AggregateOperator { diff --git a/pkg/query-service/app/traces/v4/query_builder.go b/pkg/query-service/app/traces/v4/query_builder.go index 74223fdefa..b2bb62f0eb 100644 --- a/pkg/query-service/app/traces/v4/query_builder.go +++ b/pkg/query-service/app/traces/v4/query_builder.go @@ -328,6 +328,9 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, panelType v3. aggregationKey := "" if mq.AggregateAttribute.Key != "" { aggregationKey = getColumnName(mq.AggregateAttribute) + if mq.AggregateAttribute.Key == "timestamp" { + aggregationKey = "toUnixTimestamp64Nano(timestamp)" + } } var queryTmpl string diff --git a/pkg/query-service/app/traces/v4/query_builder_test.go b/pkg/query-service/app/traces/v4/query_builder_test.go index 23eb29baa7..1a3ca9e5e7 100644 --- a/pkg/query-service/app/traces/v4/query_builder_test.go +++ b/pkg/query-service/app/traces/v4/query_builder_test.go @@ -678,6 +678,21 @@ func Test_buildTracesQuery(t *testing.T) { want: "SELECT toFloat64(count(distinct(name))) as value from signoz_traces.distributed_signoz_index_v3 where (timestamp >= '1680066360726210000' AND timestamp <= '1680066458000000000') AND " + "(ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) having value > 10 order by value ASC", }, + { + name: "Test timestamp as aggregate attribute key", + args: args{ + panelType: v3.PanelTypeTable, + start: 1680066360726210000, + end: 1680066458000000000, + mq: &v3.BuilderQuery{ + AggregateOperator: v3.AggregateOperatorMax, + Filters: &v3.FilterSet{}, + AggregateAttribute: v3.AttributeKey{Key: "timestamp", IsColumn: false, Type: v3.AttributeKeyTypeTag}, + }, + }, + want: "SELECT max(toUnixTimestamp64Nano(timestamp)) as value from signoz_traces.distributed_signoz_index_v3 where (timestamp >= '1680066360726210000' AND timestamp <= '1680066458000000000') AND " + + "(ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) order by value DESC", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {