From ed5d217c762ec2f234ca3ed2d3ae8d6040de6424 Mon Sep 17 00:00:00 2001 From: nityanandagohain Date: Wed, 13 Jul 2022 15:42:13 +0530 Subject: [PATCH] API for filtering and paginating logs added --- .../app/clickhouseReader/reader.go | 35 ++++++ pkg/query-service/app/http_handler.go | 29 ++++- pkg/query-service/app/logs/parser.go | 100 ++++++++++++++++++ pkg/query-service/app/logs/validator.go | 25 +++++ pkg/query-service/interfaces/interface.go | 1 + pkg/query-service/model/queryParams.go | 13 +++ pkg/query-service/model/response.go | 16 +++ 7 files changed, 216 insertions(+), 3 deletions(-) create mode 100644 pkg/query-service/app/logs/parser.go diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index bfcdbe4368..ac41d049b0 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -47,6 +47,7 @@ import ( "github.com/jmoiron/sqlx" promModel "github.com/prometheus/common/model" + "go.signoz.io/query-service/app/logs" "go.signoz.io/query-service/constants" am "go.signoz.io/query-service/integrations/alertManager" "go.signoz.io/query-service/model" @@ -3079,3 +3080,37 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda } return nil } + +func (r *ClickHouseReader) GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.GetLogsResponse, *model.ApiError) { + response := &[]model.GetLogsResponse{} + fields, apiErr := r.GetLogFields(ctx) + if apiErr != nil { + return nil, apiErr + } + + filterSql, err := logs.ParseLogFilter(fields, ¶ms.Filters) + if err != nil { + return nil, &model.ApiError{Err: err, Typ: model.ErrorBadData} + } + + query := fmt.Sprintf("SELECT "+ + "timestamp, observed_timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body,"+ + "CAST((attributes_string_key, attributes_string_value), 'Map(String, String)') as attributes_string,"+ + "CAST((attributes_int64_key, attributes_int64_value), 'Map(String, Int64)') as attributes_int64,"+ + "CAST((attributes_float64_key, attributes_float64_value), 'Map(String, Float64)') as attributes_float64,"+ + "CAST((resources_string_key, resources_string_value), 'Map(String, String)') as resources_string "+ + "from %s.%s", r.logsDB, r.logsTable) + + if filterSql != nil && *filterSql != "" { + query += fmt.Sprintf(" where %s", *filterSql) + } + + query = fmt.Sprintf("%s order by %s %s limit %d", query, params.OrderBy, params.Order, params.Limit) + zap.S().Debug(query) + err = r.db.Select(ctx, response, query) + if err != nil { + return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + + return response, nil +} diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index dd752fa376..c4ebeccaa9 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -1821,15 +1821,15 @@ func (aH *APIHandler) writeJSON(w http.ResponseWriter, r *http.Request, response // logs func (aH *APIHandler) RegisterLogsRoutes(router *mux.Router) { subRouter := router.PathPrefix("/api/v1/logs").Subrouter() + subRouter.HandleFunc("", ViewAccess(aH.getLogs)).Methods(http.MethodGet) subRouter.HandleFunc("/fields", ViewAccess(aH.logFields)).Methods(http.MethodGet) subRouter.HandleFunc("/fields", ViewAccess(aH.logFieldUpdate)).Methods(http.MethodPost) } func (aH *APIHandler) logFields(w http.ResponseWriter, r *http.Request) { - fields, apiErr := (*aH.reader).GetLogFields(r.Context()) if apiErr != nil { - respondError(w, apiErr, "Failed to fetch org from the DB") + respondError(w, apiErr, "Failed to fetch fields from the DB") return } aH.writeJSON(w, r, fields) @@ -1852,8 +1852,31 @@ func (aH *APIHandler) logFieldUpdate(w http.ResponseWriter, r *http.Request) { apiErr := (*aH.reader).UpdateLogField(r.Context(), &field) if apiErr != nil { - respondError(w, apiErr, "Failed to fetch org from the DB") + respondError(w, apiErr, "Failed to update filed in the DB") return } aH.writeJSON(w, r, field) } + +func (aH *APIHandler) getLogs(w http.ResponseWriter, r *http.Request) { + params, err := logs.ParseFilterParams(r) + if err != nil { + apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err} + respondError(w, apiErr, "Incorrect params") + return + } + + err = logs.ValidateFilters(¶ms.Filters) + if err != nil { + apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err} + respondError(w, apiErr, "Incorrect filter") + return + } + + res, apiErr := (*aH.reader).GetLogs(r.Context(), params) + if apiErr != nil { + respondError(w, apiErr, "Failed to fetch logs from the DB") + return + } + aH.writeJSON(w, r, map[string]interface{}{"results": res}) +} diff --git a/pkg/query-service/app/logs/parser.go b/pkg/query-service/app/logs/parser.go new file mode 100644 index 0000000000..a7975cdd8d --- /dev/null +++ b/pkg/query-service/app/logs/parser.go @@ -0,0 +1,100 @@ +package logs + +import ( + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" + + "go.signoz.io/query-service/model" +) + +var operatorMapping = map[string]string{ + "eq": "=", + "neq": "!=", + "lt": "<", + "gt": ">", + "lte": "<=", + "gte": ">=", + "in": "in", + "like": "like", + "ilike": "ilike", +} + +func arrayToMap(fields []model.LogField) map[string]model.LogField { + res := map[string]model.LogField{} + for _, field := range fields { + res[field.Name] = field + } + return res +} + +func ParseFilterParams(r *http.Request) (*model.LogsFilterParams, error) { + res := model.LogsFilterParams{ + Limit: 30, + OrderBy: "timestamp", + Order: "desc", + } + var err error + params := r.URL.Query() + filters := []model.LogFilter{} + if val, ok := params["limit"]; ok { + res.Limit, err = strconv.Atoi(val[0]) + if err != nil { + return nil, err + } + } + if val, ok := params["orderBy"]; ok { + res.OrderBy = val[0] + } + if val, ok := params["order"]; ok { + res.Order = val[0] + } + if val, ok := params["filter"]; ok { + err := json.Unmarshal([]byte(val[0]), &filters) + if err != nil { + return nil, err + } + } + res.Filters = filters + return &res, nil +} + +func ParseLogFilter(allFields *model.GetFieldsResponse, filters *[]model.LogFilter) (*string, error) { + fLen := len(*filters) + if fLen <= 0 { + return nil, nil + } + + selectedFieldsLookup := arrayToMap(allFields.Selected) + interestingFieldLookup := arrayToMap(allFields.Interesting) + filterSql := "" + for fIndx := 0; fIndx < fLen; fIndx++ { + filter := (*filters)[fIndx] + fieldSQLName := filter.Column + if _, ok := selectedFieldsLookup[filter.Column]; !ok { + if field, ok := interestingFieldLookup[filter.Column]; ok { + fieldSQLName = fmt.Sprintf("%s_%s_value[indexOf(%s_%s_key, '%s')]", field.Type, strings.ToLower(field.DataType), field.Type, strings.ToLower(field.DataType), filter.Column) + } else { + return nil, fmt.Errorf("field not found for filtering") + } + } + + filterSql += "(" + vLen := len(filter.Value) + for i := 0; i < vLen; i++ { + filterSql += fmt.Sprintf("%s%s'%v'", fieldSQLName, operatorMapping[filter.Operation], filter.Value[i]) + if i != vLen-1 { + filterSql += " or " + } + } + filterSql += ")" + + if fIndx != fLen-1 { + filterSql += " and " + } + } + + return &filterSql, nil +} diff --git a/pkg/query-service/app/logs/validator.go b/pkg/query-service/app/logs/validator.go index 9277bda047..99858969e8 100644 --- a/pkg/query-service/app/logs/validator.go +++ b/pkg/query-service/app/logs/validator.go @@ -38,3 +38,28 @@ func ValidateUpdateFieldPayload(field *model.UpdateField) error { } return nil } + +func ValidateFilters(filters *[]model.LogFilter) error { + opsRegex := "^(gte|lte|gt|lt|eq|neq|in|like|ilike)$" + regex, err := regexp.Compile(opsRegex) + if err != nil { + return err + } + for _, val := range *filters { + if val.Column == "" { + return fmt.Errorf("col cannot be empty") + } + if val.Operation == "" { + return fmt.Errorf("op cannot be empty") + } + if len(val.Value) == 0 { + return fmt.Errorf("val cannot be empty") + } + + matched := regex.MatchString(val.Operation) + if !matched { + return fmt.Errorf("op type %s not supported", val.Operation) + } + } + return nil +} diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 0bf1811b0c..bb5a428a0e 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -63,4 +63,5 @@ type Reader interface { // Logs GetLogFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError) UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError + GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.GetLogsResponse, *model.ApiError) } diff --git a/pkg/query-service/model/queryParams.go b/pkg/query-service/model/queryParams.go index ec2032a06e..c39c3eb802 100644 --- a/pkg/query-service/model/queryParams.go +++ b/pkg/query-service/model/queryParams.go @@ -312,3 +312,16 @@ type UpdateField struct { IndexType *string `json:"index"` IndexGranularity *int `json:"indexGranularity"` } + +type LogFilter struct { + Column string `json:"col"` + Operation string `json:"op"` + Value []interface{} `json:"val"` +} + +type LogsFilterParams struct { + Limit int `json:"limit"` + OrderBy string `json:"orderBy"` + Order string `json:"order"` + Filters []LogFilter `json:"filters"` +} diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index f61132bc94..f6120f34af 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -388,3 +388,19 @@ type GetFieldsResponse struct { Selected []LogField `json:"selected"` Interesting []LogField `json:"interesting"` } + +type GetLogsResponse struct { + Timestamp uint64 `json:"timestamp" ch:"timestamp"` + ObservedTimestamp uint64 `json:"observedTimestamp" ch:"observed_timestamp"` + ID string `json:"id" ch:"id"` + TraceID string `json:"traceId" ch:"trace_id"` + SpanID string `json:"spanId" ch:"span_id"` + TraceFlags uint32 `json:"traceFlags" ch:"trace_flags"` + SeverityText string `json:"severityText" ch:"severity_text"` + SeverityNumber int32 `json:"severityNumber" ch:"severity_number"` + Body string `json:"body" ch:"body"` + Resources_string map[string]string `json:"resourcesString" ch:"resources_string"` + Attributes_string map[string]string `json:"attributesString" ch:"attributes_string"` + Attributes_int64 map[string]int64 `json:"attributesInt" ch:"attributes_int64"` + Attributes_float64 map[string]float64 `json:"attributesFloat" ch:"attributes_float64"` +}