feat: add APIs for third party api feat (#7005)

* feat: add APIs for third party api feat

Signed-off-by: Shivanshu Raj Shrivastava <shivanshu1333@gmail.com>

* fix: minor fixes

Signed-off-by: Shivanshu Raj Shrivastava <shivanshu1333@gmail.com>

* test: add unit tests

Signed-off-by: Shivanshu Raj Shrivastava <shivanshu1333@gmail.com>

* chore: minor changes

Signed-off-by: Shivanshu Raj Shrivastava <shivanshu1333@gmail.com>

* fix: review comments

Signed-off-by: Shivanshu Raj Shrivastava <shivanshu1333@gmail.com>

* test: add unit tests

Signed-off-by: Shivanshu Raj Shrivastava <shivanshu1333@gmail.com>

* chore: cleanup

Signed-off-by: Shivanshu Raj Shrivastava <shivanshu1333@gmail.com>

* chore: review comments

Signed-off-by: Shivanshu Raj Shrivastava <shivanshu1333@gmail.com>

* chore: review comments

Signed-off-by: Shivanshu Raj Shrivastava <shivanshu1333@gmail.com>

---------

Signed-off-by: Shivanshu Raj Shrivastava <shivanshu1333@gmail.com>
This commit is contained in:
Shivanshu Raj Shrivastava 2025-02-19 16:08:58 +05:30 committed by GitHub
parent 3b6952abf2
commit 407654e68e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 751 additions and 36 deletions

View File

@ -374,6 +374,7 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler, web web.Web) (*h
apiHandler.RegisterQueryRangeV4Routes(r, am)
apiHandler.RegisterWebSocketPaths(r, am)
apiHandler.RegisterMessagingQueuesRoutes(r, am)
apiHandler.RegisterThirdPartyApiRoutes(r, am)
c := cors.New(cors.Options{
AllowedOrigins: []string{"*"},

View File

@ -31,6 +31,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/inframetrics"
"go.signoz.io/signoz/pkg/query-service/app/integrations"
queues2 "go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/queues"
"go.signoz.io/signoz/pkg/query-service/app/integrations/thirdPartyApi"
"go.signoz.io/signoz/pkg/query-service/app/logs"
logsv3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3"
logsv4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4"
@ -2600,21 +2601,19 @@ func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *Auth
spanEvaluation := kafkaRouter.PathPrefix("/span").Subrouter()
spanEvaluation.HandleFunc("/evaluation", am.ViewAccess(aH.getProducerConsumerEval)).Methods(http.MethodPost)
}
// -------------------------------------------------
// Celery-specific routes
celeryRouter := messagingQueuesRouter.PathPrefix("/celery").Subrouter()
// RegisterThirdPartyApiRoutes adds third-party-api integration routes
func (aH *APIHandler) RegisterThirdPartyApiRoutes(router *mux.Router, am *AuthMiddleware) {
// Celery overview routes
celeryRouter.HandleFunc("/overview", am.ViewAccess(aH.getCeleryOverview)).Methods(http.MethodPost)
// Main messaging queues router
thirdPartyApiRouter := router.PathPrefix("/api/v1/third-party-apis").Subrouter()
// Celery tasks routes
celeryRouter.HandleFunc("/tasks", am.ViewAccess(aH.getCeleryTasks)).Methods(http.MethodPost)
// Domain Overview route
overviewRouter := thirdPartyApiRouter.PathPrefix("/overview").Subrouter()
// Celery performance routes
celeryRouter.HandleFunc("/performance", am.ViewAccess(aH.getCeleryPerformance)).Methods(http.MethodPost)
// for other messaging queues, add SubRouters here
overviewRouter.HandleFunc("/list", am.ViewAccess(aH.getDomainList)).Methods(http.MethodPost)
overviewRouter.HandleFunc("/domain", am.ViewAccess(aH.getDomainInfo)).Methods(http.MethodPost)
}
// not using md5 hashing as the plain string would work
@ -3493,24 +3492,6 @@ func (aH *APIHandler) getProducerConsumerEval(
aH.Respond(w, resp)
}
// ParseKafkaQueueBody parse for messaging queue params
func ParseKafkaQueueBody(r *http.Request) (*kafka.MessagingQueue, *model.ApiError) {
messagingQueue := new(kafka.MessagingQueue)
if err := json.NewDecoder(r.Body).Decode(messagingQueue); err != nil {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)}
}
return messagingQueue, nil
}
// ParseQueueBody parses for any queue
func ParseQueueBody(r *http.Request) (*queues2.QueueListRequest, *model.ApiError) {
queue := new(queues2.QueueListRequest)
if err := json.NewDecoder(r.Body).Decode(queue); err != nil {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)}
}
return queue, nil
}
// Preferences
func (aH *APIHandler) getUserPreference(
@ -5515,13 +5496,78 @@ func (aH *APIHandler) getQueueOverview(w http.ResponseWriter, r *http.Request) {
aH.Respond(w, results)
}
func (aH *APIHandler) getCeleryOverview(w http.ResponseWriter, r *http.Request) {
func (aH *APIHandler) getDomainList(w http.ResponseWriter, r *http.Request) {
thirdPartyQueryRequest, apiErr := ParseRequestBody(r)
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
RespondError(w, apiErr, nil)
return
}
queryRangeParams, err := thirdPartyApi.BuildDomainList(thirdPartyQueryRequest)
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
var result []*v3.Result
var errQuriesByName map[string]error
result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
return
}
result = postprocess.TransformToTableForBuilderQueries(result, queryRangeParams)
if !thirdPartyQueryRequest.ShowIp {
result = thirdPartyApi.FilterResponse(result)
}
resp := v3.QueryRangeResponse{
Result: result,
}
aH.Respond(w, resp)
}
func (aH *APIHandler) getCeleryTasks(w http.ResponseWriter, r *http.Request) {
// TODO: Implement celery tasks logic for both state and list types
}
func (aH *APIHandler) getDomainInfo(w http.ResponseWriter, r *http.Request) {
thirdPartyQueryRequest, apiErr := ParseRequestBody(r)
func (aH *APIHandler) getCeleryPerformance(w http.ResponseWriter, r *http.Request) {
// TODO: Implement celery performance logic for error, rate, and latency types
if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
RespondError(w, apiErr, nil)
return
}
queryRangeParams, err := thirdPartyApi.BuildDomainInfo(thirdPartyQueryRequest)
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
var result []*v3.Result
var errQuriesByName map[string]error
result, errQuriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQuriesByName)
return
}
result = postprocess.TransformToTableForBuilderQueries(result, queryRangeParams)
if !thirdPartyQueryRequest.ShowIp {
result = thirdPartyApi.FilterResponse(result)
}
resp := v3.QueryRangeResponse{
Result: result,
}
aH.Respond(w, resp)
}

View File

@ -0,0 +1,13 @@
package thirdPartyApi
import v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
type ThirdPartyApis struct {
Start int64 `json:"start"`
End int64 `json:"end"`
ShowIp bool `json:"show_ip,omitempty"`
Domain int64 `json:"domain,omitempty"`
Endpoint string `json:"endpoint,omitempty"`
Filters v3.FilterSet `json:"filters,omitempty"`
GroupBy []v3.AttributeKey `json:"groupBy,omitempty"`
}

View File

@ -0,0 +1,337 @@
package thirdPartyApi
import (
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"net"
)
var defaultStepInterval int64 = 60
func FilterResponse(results []*v3.Result) []*v3.Result {
filteredResults := make([]*v3.Result, 0, len(results))
for _, res := range results {
if res.Table == nil {
continue
}
filteredRows := make([]*v3.TableRow, 0, len(res.Table.Rows))
for _, row := range res.Table.Rows {
if row.Data != nil {
if domainVal, ok := row.Data["net.peer.name"]; ok {
if domainStr, ok := domainVal.(string); ok {
if net.ParseIP(domainStr) != nil {
continue
}
}
}
}
filteredRows = append(filteredRows, row)
}
res.Table.Rows = filteredRows
filteredResults = append(filteredResults, res)
}
return filteredResults
}
func getFilterSet(existingFilters []v3.FilterItem, apiFilters v3.FilterSet) []v3.FilterItem {
if len(apiFilters.Items) != 0 {
existingFilters = append(existingFilters, apiFilters.Items...)
}
return existingFilters
}
func getGroupBy(existingGroupBy []v3.AttributeKey, apiGroupBy []v3.AttributeKey) []v3.AttributeKey {
if len(apiGroupBy) != 0 {
existingGroupBy = append(existingGroupBy, apiGroupBy...)
}
return existingGroupBy
}
func BuildDomainList(thirdPartyApis *ThirdPartyApis) (*v3.QueryRangeParamsV3, error) {
unixMilliStart := thirdPartyApis.Start
unixMilliEnd := thirdPartyApis.End
builderQueries := make(map[string]*v3.BuilderQuery)
builderQueries["endpoints"] = &v3.BuilderQuery{
QueryName: "endpoints",
DataSource: v3.DataSourceTraces,
StepInterval: defaultStepInterval,
AggregateOperator: v3.AggregateOperatorCountDistinct,
AggregateAttribute: v3.AttributeKey{
Key: "http.url",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
TimeAggregation: v3.TimeAggregationCountDistinct,
SpaceAggregation: v3.SpaceAggregationSum,
Filters: &v3.FilterSet{
Operator: "AND",
Items: getFilterSet([]v3.FilterItem{}, thirdPartyApis.Filters),
},
Expression: "endpoints",
GroupBy: getGroupBy([]v3.AttributeKey{
{
Key: "net.peer.name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
}, thirdPartyApis.GroupBy),
ReduceTo: v3.ReduceToOperatorAvg,
}
builderQueries["lastseen"] = &v3.BuilderQuery{
QueryName: "lastseen",
DataSource: v3.DataSourceTraces,
StepInterval: defaultStepInterval,
AggregateOperator: v3.AggregateOperatorMax,
AggregateAttribute: v3.AttributeKey{
Key: "timestamp",
},
TimeAggregation: v3.TimeAggregationMax,
SpaceAggregation: v3.SpaceAggregationSum,
Filters: &v3.FilterSet{
Operator: "AND",
Items: getFilterSet([]v3.FilterItem{}, thirdPartyApis.Filters),
},
Expression: "lastseen",
GroupBy: getGroupBy([]v3.AttributeKey{
{
Key: "net.peer.name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
}, thirdPartyApis.GroupBy),
ReduceTo: v3.ReduceToOperatorAvg,
}
builderQueries["rps"] = &v3.BuilderQuery{
QueryName: "rps",
DataSource: v3.DataSourceTraces,
StepInterval: defaultStepInterval,
AggregateOperator: v3.AggregateOperatorRate,
AggregateAttribute: v3.AttributeKey{
Key: "",
},
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum,
Filters: &v3.FilterSet{
Operator: "AND",
Items: getFilterSet([]v3.FilterItem{}, thirdPartyApis.Filters),
},
Expression: "rps",
GroupBy: getGroupBy([]v3.AttributeKey{
{
Key: "net.peer.name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
}, thirdPartyApis.GroupBy),
ReduceTo: v3.ReduceToOperatorAvg,
}
builderQueries["error_rate"] = &v3.BuilderQuery{
QueryName: "error_rate",
DataSource: v3.DataSourceTraces,
StepInterval: defaultStepInterval,
AggregateOperator: v3.AggregateOperatorRate,
AggregateAttribute: v3.AttributeKey{
Key: "",
},
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum,
Filters: &v3.FilterSet{
Operator: "AND",
Items: getFilterSet([]v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "has_error",
DataType: v3.AttributeKeyDataTypeBool,
IsColumn: true,
},
Operator: "=",
Value: "true",
},
}, thirdPartyApis.Filters),
},
Expression: "error_rate",
GroupBy: getGroupBy([]v3.AttributeKey{
{
Key: "net.peer.name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
}, thirdPartyApis.GroupBy),
ReduceTo: v3.ReduceToOperatorAvg,
}
builderQueries["p99"] = &v3.BuilderQuery{
QueryName: "p99",
DataSource: v3.DataSourceTraces,
StepInterval: defaultStepInterval,
AggregateOperator: v3.AggregateOperatorP99,
AggregateAttribute: v3.AttributeKey{
Key: "duration_nano",
DataType: v3.AttributeKeyDataTypeFloat64,
IsColumn: true,
},
TimeAggregation: "p99",
SpaceAggregation: v3.SpaceAggregationSum,
Filters: &v3.FilterSet{
Operator: "AND",
Items: getFilterSet([]v3.FilterItem{}, thirdPartyApis.Filters),
},
Expression: "p99",
GroupBy: getGroupBy([]v3.AttributeKey{
{
Key: "net.peer.name",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
}, thirdPartyApis.GroupBy),
ReduceTo: v3.ReduceToOperatorAvg,
}
compositeQuery := &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeTable,
FillGaps: false,
BuilderQueries: builderQueries,
}
queryRangeParams := &v3.QueryRangeParamsV3{
Start: unixMilliStart,
End: unixMilliEnd,
Step: defaultStepInterval,
CompositeQuery: compositeQuery,
Version: "v4",
FormatForWeb: true,
}
return queryRangeParams, nil
}
func BuildDomainInfo(thirdPartyApis *ThirdPartyApis) (*v3.QueryRangeParamsV3, error) {
unixMilliStart := thirdPartyApis.Start
unixMilliEnd := thirdPartyApis.End
builderQueries := make(map[string]*v3.BuilderQuery)
builderQueries["endpoints"] = &v3.BuilderQuery{
QueryName: "endpoints",
DataSource: v3.DataSourceTraces,
StepInterval: defaultStepInterval,
AggregateOperator: v3.AggregateOperatorCount,
AggregateAttribute: v3.AttributeKey{
Key: "http.url",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum,
Filters: &v3.FilterSet{
Operator: "AND",
Items: getFilterSet([]v3.FilterItem{}, thirdPartyApis.Filters),
},
Expression: "endpoints",
Disabled: false,
GroupBy: getGroupBy([]v3.AttributeKey{
{
Key: "http.url",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
}, thirdPartyApis.GroupBy),
Legend: "",
ReduceTo: v3.ReduceToOperatorAvg,
}
builderQueries["p99"] = &v3.BuilderQuery{
QueryName: "p99",
DataSource: v3.DataSourceTraces,
StepInterval: defaultStepInterval,
AggregateOperator: v3.AggregateOperatorP99,
AggregateAttribute: v3.AttributeKey{
Key: "duration_nano",
DataType: v3.AttributeKeyDataTypeFloat64,
IsColumn: true,
},
TimeAggregation: "p99",
SpaceAggregation: v3.SpaceAggregationSum,
Filters: &v3.FilterSet{
Operator: "AND",
Items: getFilterSet([]v3.FilterItem{}, thirdPartyApis.Filters),
},
Expression: "p99",
Disabled: false,
Having: nil,
GroupBy: getGroupBy([]v3.AttributeKey{}, thirdPartyApis.GroupBy),
Legend: "",
ReduceTo: v3.ReduceToOperatorAvg,
}
builderQueries["error_rate"] = &v3.BuilderQuery{
QueryName: "error_rate",
DataSource: v3.DataSourceTraces,
StepInterval: defaultStepInterval,
AggregateOperator: v3.AggregateOperatorRate,
AggregateAttribute: v3.AttributeKey{
Key: "",
},
TimeAggregation: v3.TimeAggregationRate,
SpaceAggregation: v3.SpaceAggregationSum,
Filters: &v3.FilterSet{
Operator: "AND",
Items: getFilterSet([]v3.FilterItem{}, thirdPartyApis.Filters),
},
Expression: "error_rate",
Disabled: false,
GroupBy: getGroupBy([]v3.AttributeKey{}, thirdPartyApis.GroupBy),
Legend: "",
ReduceTo: v3.ReduceToOperatorAvg,
}
builderQueries["lastseen"] = &v3.BuilderQuery{
QueryName: "lastseen",
DataSource: v3.DataSourceTraces,
StepInterval: defaultStepInterval,
AggregateOperator: v3.AggregateOperatorMax,
AggregateAttribute: v3.AttributeKey{
Key: "timestamp",
},
TimeAggregation: v3.TimeAggregationMax,
SpaceAggregation: v3.SpaceAggregationSum,
Filters: &v3.FilterSet{
Operator: "AND",
Items: getFilterSet([]v3.FilterItem{}, thirdPartyApis.Filters),
},
Expression: "lastseen",
Disabled: false,
Having: nil,
OrderBy: nil,
GroupBy: getGroupBy([]v3.AttributeKey{}, thirdPartyApis.GroupBy),
Legend: "",
ReduceTo: v3.ReduceToOperatorAvg,
}
compositeQuery := &v3.CompositeQuery{
QueryType: v3.QueryTypeBuilder,
PanelType: v3.PanelTypeTable,
FillGaps: false,
BuilderQueries: builderQueries,
}
queryRangeParams := &v3.QueryRangeParamsV3{
Start: unixMilliStart,
End: unixMilliEnd,
Step: defaultStepInterval,
CompositeQuery: compositeQuery,
Version: "v4",
FormatForWeb: true,
}
return queryRangeParams, nil
}

View File

@ -0,0 +1,267 @@
package thirdPartyApi
import (
"testing"
"github.com/stretchr/testify/assert"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
func TestFilterResponse(t *testing.T) {
tests := []struct {
name string
input []*v3.Result
expected []*v3.Result
}{
{
name: "should filter out IP addresses from net.peer.name",
input: []*v3.Result{
{
Table: &v3.Table{
Rows: []*v3.TableRow{
{
Data: map[string]interface{}{
"net.peer.name": "192.168.1.1",
},
},
{
Data: map[string]interface{}{
"net.peer.name": "example.com",
},
},
},
},
},
},
expected: []*v3.Result{
{
Table: &v3.Table{
Rows: []*v3.TableRow{
{
Data: map[string]interface{}{
"net.peer.name": "example.com",
},
},
},
},
},
},
},
{
name: "should handle nil data",
input: []*v3.Result{
{
Table: &v3.Table{
Rows: []*v3.TableRow{
{
Data: nil,
},
},
},
},
},
expected: []*v3.Result{
{
Table: &v3.Table{
Rows: []*v3.TableRow{
{
Data: nil,
},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := FilterResponse(tt.input)
assert.Equal(t, tt.expected, result)
})
}
}
func TestGetFilterSet(t *testing.T) {
tests := []struct {
name string
existingFilters []v3.FilterItem
apiFilters v3.FilterSet
expected []v3.FilterItem
}{
{
name: "should append new filters",
existingFilters: []v3.FilterItem{
{
Key: v3.AttributeKey{Key: "existing"},
},
},
apiFilters: v3.FilterSet{
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{Key: "new"},
},
},
},
expected: []v3.FilterItem{
{
Key: v3.AttributeKey{Key: "existing"},
},
{
Key: v3.AttributeKey{Key: "new"},
},
},
},
{
name: "should handle empty api filters",
existingFilters: []v3.FilterItem{{Key: v3.AttributeKey{Key: "existing"}}},
apiFilters: v3.FilterSet{},
expected: []v3.FilterItem{{Key: v3.AttributeKey{Key: "existing"}}},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := getFilterSet(tt.existingFilters, tt.apiFilters)
assert.Equal(t, tt.expected, result)
})
}
}
func TestGetGroupBy(t *testing.T) {
tests := []struct {
name string
existingGroup []v3.AttributeKey
apiGroup []v3.AttributeKey
expected []v3.AttributeKey
}{
{
name: "should append new group by attributes",
existingGroup: []v3.AttributeKey{
{Key: "existing"},
},
apiGroup: []v3.AttributeKey{
{Key: "new"},
},
expected: []v3.AttributeKey{
{Key: "existing"},
{Key: "new"},
},
},
{
name: "should handle empty api group",
existingGroup: []v3.AttributeKey{{Key: "existing"}},
apiGroup: []v3.AttributeKey{},
expected: []v3.AttributeKey{{Key: "existing"}},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := getGroupBy(tt.existingGroup, tt.apiGroup)
assert.Equal(t, tt.expected, result)
})
}
}
func TestBuildDomainList(t *testing.T) {
tests := []struct {
name string
input *ThirdPartyApis
wantErr bool
}{
{
name: "basic domain list query",
input: &ThirdPartyApis{
Start: 1000,
End: 2000,
},
wantErr: false,
},
{
name: "with filters and group by",
input: &ThirdPartyApis{
Start: 1000,
End: 2000,
Filters: v3.FilterSet{
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{Key: "test"},
},
},
},
GroupBy: []v3.AttributeKey{
{Key: "test"},
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := BuildDomainList(tt.input)
if tt.wantErr {
assert.Error(t, err)
return
}
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Equal(t, tt.input.Start, result.Start)
assert.Equal(t, tt.input.End, result.End)
assert.NotNil(t, result.CompositeQuery)
assert.NotNil(t, result.CompositeQuery.BuilderQueries)
})
}
}
func TestBuildDomainInfo(t *testing.T) {
tests := []struct {
name string
input *ThirdPartyApis
wantErr bool
}{
{
name: "basic domain info query",
input: &ThirdPartyApis{
Start: 1000,
End: 2000,
},
wantErr: false,
},
{
name: "with filters and group by",
input: &ThirdPartyApis{
Start: 1000,
End: 2000,
Filters: v3.FilterSet{
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{Key: "test"},
},
},
},
GroupBy: []v3.AttributeKey{
{Key: "test"},
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := BuildDomainInfo(tt.input)
if tt.wantErr {
assert.Error(t, err)
return
}
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Equal(t, tt.input.Start, result.Start)
assert.Equal(t, tt.input.End, result.End)
assert.NotNil(t, result.CompositeQuery)
assert.NotNil(t, result.CompositeQuery.BuilderQueries)
})
}
}

View File

@ -6,6 +6,9 @@ import (
"encoding/json"
"errors"
"fmt"
"go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/kafka"
queues2 "go.signoz.io/signoz/pkg/query-service/app/integrations/messagingQueues/queues"
"go.signoz.io/signoz/pkg/query-service/app/integrations/thirdPartyApi"
"math"
"net/http"
"strconv"
@ -1007,3 +1010,30 @@ func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiE
return queryRangeParams, nil
}
// ParseKafkaQueueBody parse for messaging queue params
func ParseKafkaQueueBody(r *http.Request) (*kafka.MessagingQueue, *model.ApiError) {
messagingQueue := new(kafka.MessagingQueue)
if err := json.NewDecoder(r.Body).Decode(messagingQueue); err != nil {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)}
}
return messagingQueue, nil
}
// ParseQueueBody parses for any queue
func ParseQueueBody(r *http.Request) (*queues2.QueueListRequest, *model.ApiError) {
queue := new(queues2.QueueListRequest)
if err := json.NewDecoder(r.Body).Decode(queue); err != nil {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)}
}
return queue, nil
}
// ParseRequestBody for third party APIs
func ParseRequestBody(r *http.Request) (*thirdPartyApi.ThirdPartyApis, *model.ApiError) {
thirdPartApis := new(thirdPartyApi.ThirdPartyApis)
if err := json.NewDecoder(r.Body).Decode(thirdPartApis); err != nil {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("cannot parse the request body: %v", err)}
}
return thirdPartApis, nil
}

View File

@ -315,6 +315,7 @@ func (s *Server) createPublicServer(api *APIHandler, web web.Web) (*http.Server,
api.RegisterWebSocketPaths(r, am)
api.RegisterQueryRangeV4Routes(r, am)
api.RegisterMessagingQueuesRoutes(r, am)
api.RegisterThirdPartyApiRoutes(r, am)
c := cors.New(cors.Options{
AllowedOrigins: []string{"*"},

View File

@ -297,6 +297,9 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, _ string, pan
aggregationKey := ""
if mq.AggregateAttribute.Key != "" {
aggregationKey = getColumnName(mq.AggregateAttribute)
if mq.AggregateAttribute.Key == "timestamp" {
aggregationKey = "toUnixTimestamp64Nano(timestamp)"
}
}
switch mq.AggregateOperator {

View File

@ -328,6 +328,9 @@ func buildTracesQuery(start, end, step int64, mq *v3.BuilderQuery, panelType v3.
aggregationKey := ""
if mq.AggregateAttribute.Key != "" {
aggregationKey = getColumnName(mq.AggregateAttribute)
if mq.AggregateAttribute.Key == "timestamp" {
aggregationKey = "toUnixTimestamp64Nano(timestamp)"
}
}
var queryTmpl string

View File

@ -678,6 +678,21 @@ func Test_buildTracesQuery(t *testing.T) {
want: "SELECT toFloat64(count(distinct(name))) as value from signoz_traces.distributed_signoz_index_v3 where (timestamp >= '1680066360726210000' AND timestamp <= '1680066458000000000') AND " +
"(ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) having value > 10 order by value ASC",
},
{
name: "Test timestamp as aggregate attribute key",
args: args{
panelType: v3.PanelTypeTable,
start: 1680066360726210000,
end: 1680066458000000000,
mq: &v3.BuilderQuery{
AggregateOperator: v3.AggregateOperatorMax,
Filters: &v3.FilterSet{},
AggregateAttribute: v3.AttributeKey{Key: "timestamp", IsColumn: false, Type: v3.AttributeKeyTypeTag},
},
},
want: "SELECT max(toUnixTimestamp64Nano(timestamp)) as value from signoz_traces.distributed_signoz_index_v3 where (timestamp >= '1680066360726210000' AND timestamp <= '1680066458000000000') AND " +
"(ts_bucket_start >= 1680064560 AND ts_bucket_start <= 1680066458) order by value DESC",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {