mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-14 14:06:12 +08:00
chore: add range query impl for promql
This commit is contained in:
parent
982688ccc9
commit
77e44d7a1e
@ -14,6 +14,8 @@ var (
|
|||||||
CodeAlreadyExists = Code{"already_exists"}
|
CodeAlreadyExists = Code{"already_exists"}
|
||||||
CodeUnauthenticated = Code{"unauthenticated"}
|
CodeUnauthenticated = Code{"unauthenticated"}
|
||||||
CodeForbidden = Code{"forbidden"}
|
CodeForbidden = Code{"forbidden"}
|
||||||
|
CodeCanceled = Code{"canceled"}
|
||||||
|
CodeTimeout = Code{"timeout"}
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -9,6 +9,8 @@ var (
|
|||||||
TypeAlreadyExists = typ{"already-exists"}
|
TypeAlreadyExists = typ{"already-exists"}
|
||||||
TypeUnauthenticated = typ{"unauthenticated"}
|
TypeUnauthenticated = typ{"unauthenticated"}
|
||||||
TypeForbidden = typ{"forbidden"}
|
TypeForbidden = typ{"forbidden"}
|
||||||
|
TypeCanceled = typ{"canceled"}
|
||||||
|
TypeTimeout = typ{"timeout"}
|
||||||
)
|
)
|
||||||
|
|
||||||
// Defines custom error types
|
// Defines custom error types
|
||||||
|
@ -7,6 +7,12 @@ import (
|
|||||||
jsoniter "github.com/json-iterator/go"
|
jsoniter "github.com/json-iterator/go"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// Non-standard status code (originally introduced by nginx) for the case when a client closes
|
||||||
|
// the connection while the server is still processing the request.
|
||||||
|
statusClientClosedConnection = 499
|
||||||
|
)
|
||||||
|
|
||||||
var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
||||||
|
|
||||||
type response struct {
|
type response struct {
|
||||||
@ -60,6 +66,10 @@ func Error(rw http.ResponseWriter, cause error) {
|
|||||||
httpCode = http.StatusNotImplemented
|
httpCode = http.StatusNotImplemented
|
||||||
case errors.TypeForbidden:
|
case errors.TypeForbidden:
|
||||||
httpCode = http.StatusForbidden
|
httpCode = http.StatusForbidden
|
||||||
|
case errors.TypeCanceled:
|
||||||
|
httpCode = statusClientClosedConnection
|
||||||
|
case errors.TypeTimeout:
|
||||||
|
httpCode = http.StatusServiceUnavailable
|
||||||
}
|
}
|
||||||
|
|
||||||
rea := make([]responseerroradditional, len(a))
|
rea := make([]responseerroradditional, len(a))
|
||||||
|
@ -2,9 +2,14 @@ package querier
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/SigNoz/signoz/pkg/errors"
|
||||||
"github.com/SigNoz/signoz/pkg/prometheus"
|
"github.com/SigNoz/signoz/pkg/prometheus"
|
||||||
|
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||||
qbv5 "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
qbv5 "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||||
|
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||||
|
"github.com/prometheus/prometheus/promql"
|
||||||
)
|
)
|
||||||
|
|
||||||
type promqlQuery struct {
|
type promqlQuery struct {
|
||||||
@ -35,7 +40,90 @@ func (q *promqlQuery) Window() (uint64, uint64) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (q *promqlQuery) Execute(ctx context.Context) (*qbv5.Result, error) {
|
func (q *promqlQuery) Execute(ctx context.Context) (*qbv5.Result, error) {
|
||||||
// TODO: Implement this
|
|
||||||
//nolint:nilnil
|
start := int64(querybuilder.ToNanoSecs(q.tr.From))
|
||||||
return nil, nil
|
end := int64(querybuilder.ToNanoSecs(q.tr.To))
|
||||||
|
|
||||||
|
qry, err := q.promEngine.Engine().NewRangeQuery(
|
||||||
|
ctx,
|
||||||
|
q.promEngine.Storage(),
|
||||||
|
nil,
|
||||||
|
q.query.Query,
|
||||||
|
time.Unix(0, start),
|
||||||
|
time.Unix(0, end),
|
||||||
|
q.query.Step.Duration,
|
||||||
|
)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "invalid promql query %q", q.query.Query)
|
||||||
|
}
|
||||||
|
|
||||||
|
res := qry.Exec(ctx)
|
||||||
|
if res.Err != nil {
|
||||||
|
var eqc promql.ErrQueryCanceled
|
||||||
|
var eqt promql.ErrQueryTimeout
|
||||||
|
var es promql.ErrStorage
|
||||||
|
switch {
|
||||||
|
case errors.As(res.Err, &eqc):
|
||||||
|
return nil, errors.Newf(errors.TypeCanceled, errors.CodeCanceled, "query canceled")
|
||||||
|
case errors.As(res.Err, &eqt):
|
||||||
|
return nil, errors.Newf(errors.TypeTimeout, errors.CodeTimeout, "query timeout")
|
||||||
|
case errors.As(res.Err, &es):
|
||||||
|
return nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "query execution error: %v", res.Err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if errors.Is(res.Err, context.Canceled) {
|
||||||
|
return nil, errors.Newf(errors.TypeCanceled, errors.CodeCanceled, "query canceled")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "query execution error: %v", res.Err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer qry.Close()
|
||||||
|
|
||||||
|
matrix, promErr := res.Matrix()
|
||||||
|
if promErr != nil {
|
||||||
|
return nil, errors.WrapInternalf(promErr, errors.CodeInternal, "error getting matrix from promql query %q", q.query.Query)
|
||||||
|
}
|
||||||
|
|
||||||
|
var series []*qbv5.TimeSeries
|
||||||
|
for _, v := range matrix {
|
||||||
|
var s qbv5.TimeSeries
|
||||||
|
lbls := make([]*qbv5.Label, 0, len(v.Metric))
|
||||||
|
for k, v := range v.Metric.Copy().Map() {
|
||||||
|
lbls = append(lbls, &qbv5.Label{
|
||||||
|
Key: telemetrytypes.TelemetryFieldKey{Name: k},
|
||||||
|
Value: v,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Labels = lbls
|
||||||
|
|
||||||
|
for idx := range v.Floats {
|
||||||
|
p := v.Floats[idx]
|
||||||
|
s.Values = append(s.Values, &qbv5.TimeSeriesValue{
|
||||||
|
Timestamp: p.T,
|
||||||
|
Value: p.F,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
series = append(series, &s)
|
||||||
|
}
|
||||||
|
|
||||||
|
warnings, _ := res.Warnings.AsStrings(q.query.Query, 10, 0)
|
||||||
|
|
||||||
|
return &qbv5.Result{
|
||||||
|
Type: qbv5.RequestTypeTimeSeries,
|
||||||
|
Value: []*qbv5.TimeSeriesData{
|
||||||
|
{
|
||||||
|
QueryName: q.query.Name,
|
||||||
|
Aggregations: []*qbv5.AggregationBucket{
|
||||||
|
{
|
||||||
|
Series: series,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Warnings: warnings,
|
||||||
|
// TODO: map promql stats?
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
@ -7,4 +7,8 @@ type PromQuery struct {
|
|||||||
Query string `json:"query"`
|
Query string `json:"query"`
|
||||||
// disabled if true, the query will not be executed
|
// disabled if true, the query will not be executed
|
||||||
Disabled bool `json:"disabled"`
|
Disabled bool `json:"disabled"`
|
||||||
|
// step size for the query
|
||||||
|
Step Step `json:"step"`
|
||||||
|
// stats if true, the query will return stats
|
||||||
|
Stats bool `json:"stats"`
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user