diff --git a/pkg/errors/code.go b/pkg/errors/code.go index 8457aa5575..db1e167f0e 100644 --- a/pkg/errors/code.go +++ b/pkg/errors/code.go @@ -14,6 +14,8 @@ var ( CodeAlreadyExists = Code{"already_exists"} CodeUnauthenticated = Code{"unauthenticated"} CodeForbidden = Code{"forbidden"} + CodeCanceled = Code{"canceled"} + CodeTimeout = Code{"timeout"} ) var ( diff --git a/pkg/errors/type.go b/pkg/errors/type.go index 2afdf4573e..3663f9df66 100644 --- a/pkg/errors/type.go +++ b/pkg/errors/type.go @@ -9,6 +9,8 @@ var ( TypeAlreadyExists = typ{"already-exists"} TypeUnauthenticated = typ{"unauthenticated"} TypeForbidden = typ{"forbidden"} + TypeCanceled = typ{"canceled"} + TypeTimeout = typ{"timeout"} ) // Defines custom error types diff --git a/pkg/http/render/render.go b/pkg/http/render/render.go index a3021e4b48..c11ff5b4df 100644 --- a/pkg/http/render/render.go +++ b/pkg/http/render/render.go @@ -7,6 +7,12 @@ import ( jsoniter "github.com/json-iterator/go" ) +const ( + // Non-standard status code (originally introduced by nginx) for the case when a client closes + // the connection while the server is still processing the request. + statusClientClosedConnection = 499 +) + var json = jsoniter.ConfigCompatibleWithStandardLibrary type response struct { @@ -60,6 +66,10 @@ func Error(rw http.ResponseWriter, cause error) { httpCode = http.StatusNotImplemented case errors.TypeForbidden: httpCode = http.StatusForbidden + case errors.TypeCanceled: + httpCode = statusClientClosedConnection + case errors.TypeTimeout: + httpCode = http.StatusServiceUnavailable } rea := make([]responseerroradditional, len(a)) diff --git a/pkg/querier/promql_query.go b/pkg/querier/promql_query.go index 2934563749..b5d3fc9e88 100644 --- a/pkg/querier/promql_query.go +++ b/pkg/querier/promql_query.go @@ -2,9 +2,14 @@ package querier import ( "context" + "time" + "github.com/SigNoz/signoz/pkg/errors" "github.com/SigNoz/signoz/pkg/prometheus" + "github.com/SigNoz/signoz/pkg/querybuilder" qbv5 "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5" + "github.com/SigNoz/signoz/pkg/types/telemetrytypes" + "github.com/prometheus/prometheus/promql" ) type promqlQuery struct { @@ -35,7 +40,90 @@ func (q *promqlQuery) Window() (uint64, uint64) { } func (q *promqlQuery) Execute(ctx context.Context) (*qbv5.Result, error) { - // TODO: Implement this - //nolint:nilnil - return nil, nil + + start := int64(querybuilder.ToNanoSecs(q.tr.From)) + end := int64(querybuilder.ToNanoSecs(q.tr.To)) + + qry, err := q.promEngine.Engine().NewRangeQuery( + ctx, + q.promEngine.Storage(), + nil, + q.query.Query, + time.Unix(0, start), + time.Unix(0, end), + q.query.Step.Duration, + ) + + if err != nil { + return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid promql query %q", q.query.Query) + } + + res := qry.Exec(ctx) + if res.Err != nil { + var eqc promql.ErrQueryCanceled + var eqt promql.ErrQueryTimeout + var es promql.ErrStorage + switch { + case errors.As(res.Err, &eqc): + return nil, errors.Newf(errors.TypeCanceled, errors.CodeCanceled, "query canceled") + case errors.As(res.Err, &eqt): + return nil, errors.Newf(errors.TypeTimeout, errors.CodeTimeout, "query timeout") + case errors.As(res.Err, &es): + return nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "query execution error: %v", res.Err) + } + + if errors.Is(res.Err, context.Canceled) { + return nil, errors.Newf(errors.TypeCanceled, errors.CodeCanceled, "query canceled") + } + + return nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "query execution error: %v", res.Err) + } + + defer qry.Close() + + matrix, promErr := res.Matrix() + if promErr != nil { + return nil, errors.WrapInternalf(promErr, errors.CodeInternal, "error getting matrix from promql query %q", q.query.Query) + } + + var series []*qbv5.TimeSeries + for _, v := range matrix { + var s qbv5.TimeSeries + lbls := make([]*qbv5.Label, 0, len(v.Metric)) + for k, v := range v.Metric.Copy().Map() { + lbls = append(lbls, &qbv5.Label{ + Key: telemetrytypes.TelemetryFieldKey{Name: k}, + Value: v, + }) + } + + s.Labels = lbls + + for idx := range v.Floats { + p := v.Floats[idx] + s.Values = append(s.Values, &qbv5.TimeSeriesValue{ + Timestamp: p.T, + Value: p.F, + }) + } + series = append(series, &s) + } + + warnings, _ := res.Warnings.AsStrings(q.query.Query, 10, 0) + + return &qbv5.Result{ + Type: qbv5.RequestTypeTimeSeries, + Value: []*qbv5.TimeSeriesData{ + { + QueryName: q.query.Name, + Aggregations: []*qbv5.AggregationBucket{ + { + Series: series, + }, + }, + }, + }, + Warnings: warnings, + // TODO: map promql stats? + }, nil } diff --git a/pkg/types/querybuildertypes/querybuildertypesv5/prom_query.go b/pkg/types/querybuildertypes/querybuildertypesv5/prom_query.go index 509af727ea..522ad3e0e6 100644 --- a/pkg/types/querybuildertypes/querybuildertypesv5/prom_query.go +++ b/pkg/types/querybuildertypes/querybuildertypesv5/prom_query.go @@ -7,4 +7,8 @@ type PromQuery struct { Query string `json:"query"` // disabled if true, the query will not be executed Disabled bool `json:"disabled"` + // step size for the query + Step Step `json:"step"` + // stats if true, the query will return stats + Stats bool `json:"stats"` }