mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-14 01:15:52 +08:00
API for filtering and paginating logs added
This commit is contained in:
parent
ef141d2cee
commit
ed5d217c76
@ -47,6 +47,7 @@ import (
|
||||
"github.com/jmoiron/sqlx"
|
||||
|
||||
promModel "github.com/prometheus/common/model"
|
||||
"go.signoz.io/query-service/app/logs"
|
||||
"go.signoz.io/query-service/constants"
|
||||
am "go.signoz.io/query-service/integrations/alertManager"
|
||||
"go.signoz.io/query-service/model"
|
||||
@ -3079,3 +3080,37 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.GetLogsResponse, *model.ApiError) {
|
||||
response := &[]model.GetLogsResponse{}
|
||||
fields, apiErr := r.GetLogFields(ctx)
|
||||
if apiErr != nil {
|
||||
return nil, apiErr
|
||||
}
|
||||
|
||||
filterSql, err := logs.ParseLogFilter(fields, ¶ms.Filters)
|
||||
if err != nil {
|
||||
return nil, &model.ApiError{Err: err, Typ: model.ErrorBadData}
|
||||
}
|
||||
|
||||
query := fmt.Sprintf("SELECT "+
|
||||
"timestamp, observed_timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body,"+
|
||||
"CAST((attributes_string_key, attributes_string_value), 'Map(String, String)') as attributes_string,"+
|
||||
"CAST((attributes_int64_key, attributes_int64_value), 'Map(String, Int64)') as attributes_int64,"+
|
||||
"CAST((attributes_float64_key, attributes_float64_value), 'Map(String, Float64)') as attributes_float64,"+
|
||||
"CAST((resources_string_key, resources_string_value), 'Map(String, String)') as resources_string "+
|
||||
"from %s.%s", r.logsDB, r.logsTable)
|
||||
|
||||
if filterSql != nil && *filterSql != "" {
|
||||
query += fmt.Sprintf(" where %s", *filterSql)
|
||||
}
|
||||
|
||||
query = fmt.Sprintf("%s order by %s %s limit %d", query, params.OrderBy, params.Order, params.Limit)
|
||||
zap.S().Debug(query)
|
||||
err = r.db.Select(ctx, response, query)
|
||||
if err != nil {
|
||||
return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
|
||||
}
|
||||
|
||||
return response, nil
|
||||
}
|
||||
|
@ -1821,15 +1821,15 @@ func (aH *APIHandler) writeJSON(w http.ResponseWriter, r *http.Request, response
|
||||
// logs
|
||||
func (aH *APIHandler) RegisterLogsRoutes(router *mux.Router) {
|
||||
subRouter := router.PathPrefix("/api/v1/logs").Subrouter()
|
||||
subRouter.HandleFunc("", ViewAccess(aH.getLogs)).Methods(http.MethodGet)
|
||||
subRouter.HandleFunc("/fields", ViewAccess(aH.logFields)).Methods(http.MethodGet)
|
||||
subRouter.HandleFunc("/fields", ViewAccess(aH.logFieldUpdate)).Methods(http.MethodPost)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) logFields(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
fields, apiErr := (*aH.reader).GetLogFields(r.Context())
|
||||
if apiErr != nil {
|
||||
respondError(w, apiErr, "Failed to fetch org from the DB")
|
||||
respondError(w, apiErr, "Failed to fetch fields from the DB")
|
||||
return
|
||||
}
|
||||
aH.writeJSON(w, r, fields)
|
||||
@ -1852,8 +1852,31 @@ func (aH *APIHandler) logFieldUpdate(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
apiErr := (*aH.reader).UpdateLogField(r.Context(), &field)
|
||||
if apiErr != nil {
|
||||
respondError(w, apiErr, "Failed to fetch org from the DB")
|
||||
respondError(w, apiErr, "Failed to update filed in the DB")
|
||||
return
|
||||
}
|
||||
aH.writeJSON(w, r, field)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) getLogs(w http.ResponseWriter, r *http.Request) {
|
||||
params, err := logs.ParseFilterParams(r)
|
||||
if err != nil {
|
||||
apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err}
|
||||
respondError(w, apiErr, "Incorrect params")
|
||||
return
|
||||
}
|
||||
|
||||
err = logs.ValidateFilters(¶ms.Filters)
|
||||
if err != nil {
|
||||
apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err}
|
||||
respondError(w, apiErr, "Incorrect filter")
|
||||
return
|
||||
}
|
||||
|
||||
res, apiErr := (*aH.reader).GetLogs(r.Context(), params)
|
||||
if apiErr != nil {
|
||||
respondError(w, apiErr, "Failed to fetch logs from the DB")
|
||||
return
|
||||
}
|
||||
aH.writeJSON(w, r, map[string]interface{}{"results": res})
|
||||
}
|
||||
|
100
pkg/query-service/app/logs/parser.go
Normal file
100
pkg/query-service/app/logs/parser.go
Normal file
@ -0,0 +1,100 @@
|
||||
package logs
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"go.signoz.io/query-service/model"
|
||||
)
|
||||
|
||||
var operatorMapping = map[string]string{
|
||||
"eq": "=",
|
||||
"neq": "!=",
|
||||
"lt": "<",
|
||||
"gt": ">",
|
||||
"lte": "<=",
|
||||
"gte": ">=",
|
||||
"in": "in",
|
||||
"like": "like",
|
||||
"ilike": "ilike",
|
||||
}
|
||||
|
||||
func arrayToMap(fields []model.LogField) map[string]model.LogField {
|
||||
res := map[string]model.LogField{}
|
||||
for _, field := range fields {
|
||||
res[field.Name] = field
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func ParseFilterParams(r *http.Request) (*model.LogsFilterParams, error) {
|
||||
res := model.LogsFilterParams{
|
||||
Limit: 30,
|
||||
OrderBy: "timestamp",
|
||||
Order: "desc",
|
||||
}
|
||||
var err error
|
||||
params := r.URL.Query()
|
||||
filters := []model.LogFilter{}
|
||||
if val, ok := params["limit"]; ok {
|
||||
res.Limit, err = strconv.Atoi(val[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if val, ok := params["orderBy"]; ok {
|
||||
res.OrderBy = val[0]
|
||||
}
|
||||
if val, ok := params["order"]; ok {
|
||||
res.Order = val[0]
|
||||
}
|
||||
if val, ok := params["filter"]; ok {
|
||||
err := json.Unmarshal([]byte(val[0]), &filters)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
res.Filters = filters
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
func ParseLogFilter(allFields *model.GetFieldsResponse, filters *[]model.LogFilter) (*string, error) {
|
||||
fLen := len(*filters)
|
||||
if fLen <= 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
selectedFieldsLookup := arrayToMap(allFields.Selected)
|
||||
interestingFieldLookup := arrayToMap(allFields.Interesting)
|
||||
filterSql := ""
|
||||
for fIndx := 0; fIndx < fLen; fIndx++ {
|
||||
filter := (*filters)[fIndx]
|
||||
fieldSQLName := filter.Column
|
||||
if _, ok := selectedFieldsLookup[filter.Column]; !ok {
|
||||
if field, ok := interestingFieldLookup[filter.Column]; ok {
|
||||
fieldSQLName = fmt.Sprintf("%s_%s_value[indexOf(%s_%s_key, '%s')]", field.Type, strings.ToLower(field.DataType), field.Type, strings.ToLower(field.DataType), filter.Column)
|
||||
} else {
|
||||
return nil, fmt.Errorf("field not found for filtering")
|
||||
}
|
||||
}
|
||||
|
||||
filterSql += "("
|
||||
vLen := len(filter.Value)
|
||||
for i := 0; i < vLen; i++ {
|
||||
filterSql += fmt.Sprintf("%s%s'%v'", fieldSQLName, operatorMapping[filter.Operation], filter.Value[i])
|
||||
if i != vLen-1 {
|
||||
filterSql += " or "
|
||||
}
|
||||
}
|
||||
filterSql += ")"
|
||||
|
||||
if fIndx != fLen-1 {
|
||||
filterSql += " and "
|
||||
}
|
||||
}
|
||||
|
||||
return &filterSql, nil
|
||||
}
|
@ -38,3 +38,28 @@ func ValidateUpdateFieldPayload(field *model.UpdateField) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func ValidateFilters(filters *[]model.LogFilter) error {
|
||||
opsRegex := "^(gte|lte|gt|lt|eq|neq|in|like|ilike)$"
|
||||
regex, err := regexp.Compile(opsRegex)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, val := range *filters {
|
||||
if val.Column == "" {
|
||||
return fmt.Errorf("col cannot be empty")
|
||||
}
|
||||
if val.Operation == "" {
|
||||
return fmt.Errorf("op cannot be empty")
|
||||
}
|
||||
if len(val.Value) == 0 {
|
||||
return fmt.Errorf("val cannot be empty")
|
||||
}
|
||||
|
||||
matched := regex.MatchString(val.Operation)
|
||||
if !matched {
|
||||
return fmt.Errorf("op type %s not supported", val.Operation)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -63,4 +63,5 @@ type Reader interface {
|
||||
// Logs
|
||||
GetLogFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError)
|
||||
UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError
|
||||
GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.GetLogsResponse, *model.ApiError)
|
||||
}
|
||||
|
@ -312,3 +312,16 @@ type UpdateField struct {
|
||||
IndexType *string `json:"index"`
|
||||
IndexGranularity *int `json:"indexGranularity"`
|
||||
}
|
||||
|
||||
type LogFilter struct {
|
||||
Column string `json:"col"`
|
||||
Operation string `json:"op"`
|
||||
Value []interface{} `json:"val"`
|
||||
}
|
||||
|
||||
type LogsFilterParams struct {
|
||||
Limit int `json:"limit"`
|
||||
OrderBy string `json:"orderBy"`
|
||||
Order string `json:"order"`
|
||||
Filters []LogFilter `json:"filters"`
|
||||
}
|
||||
|
@ -388,3 +388,19 @@ type GetFieldsResponse struct {
|
||||
Selected []LogField `json:"selected"`
|
||||
Interesting []LogField `json:"interesting"`
|
||||
}
|
||||
|
||||
type GetLogsResponse struct {
|
||||
Timestamp uint64 `json:"timestamp" ch:"timestamp"`
|
||||
ObservedTimestamp uint64 `json:"observedTimestamp" ch:"observed_timestamp"`
|
||||
ID string `json:"id" ch:"id"`
|
||||
TraceID string `json:"traceId" ch:"trace_id"`
|
||||
SpanID string `json:"spanId" ch:"span_id"`
|
||||
TraceFlags uint32 `json:"traceFlags" ch:"trace_flags"`
|
||||
SeverityText string `json:"severityText" ch:"severity_text"`
|
||||
SeverityNumber int32 `json:"severityNumber" ch:"severity_number"`
|
||||
Body string `json:"body" ch:"body"`
|
||||
Resources_string map[string]string `json:"resourcesString" ch:"resources_string"`
|
||||
Attributes_string map[string]string `json:"attributesString" ch:"attributes_string"`
|
||||
Attributes_int64 map[string]int64 `json:"attributesInt" ch:"attributes_int64"`
|
||||
Attributes_float64 map[string]float64 `json:"attributesFloat" ch:"attributes_float64"`
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user