Filtering logic updated

This commit is contained in:
nityanandagohain 2022-07-18 16:37:46 +05:30
parent ed5d217c76
commit 2e9affa80c
9 changed files with 430 additions and 101 deletions

View File

@ -3088,7 +3088,7 @@ func (r *ClickHouseReader) GetLogs(ctx context.Context, params *model.LogsFilter
return nil, apiErr
}
filterSql, err := logs.ParseLogFilter(fields, &params.Filters)
filterSql, err := logs.GenerateSQLWhere(fields, params)
if err != nil {
return nil, &model.ApiError{Err: err, Typ: model.ErrorBadData}
}
@ -3114,3 +3114,21 @@ func (r *ClickHouseReader) GetLogs(ctx context.Context, params *model.LogsFilter
return response, nil
}
func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailClient) *model.ApiError {
for i := 0; i < 10; i++ {
select {
case <-ctx.Done():
return nil
default:
data := fmt.Sprintf("hello log %d", i)
client.Logs <- &data
time.Sleep(time.Second)
}
}
done := true
client.Done <- &done
fmt.Println("done in the tail logs")
return nil
}

View File

@ -1,6 +1,7 @@
package app
import (
"bytes"
"context"
"encoding/json"
"errors"
@ -1822,6 +1823,7 @@ func (aH *APIHandler) writeJSON(w http.ResponseWriter, r *http.Request, response
func (aH *APIHandler) RegisterLogsRoutes(router *mux.Router) {
subRouter := router.PathPrefix("/api/v1/logs").Subrouter()
subRouter.HandleFunc("", ViewAccess(aH.getLogs)).Methods(http.MethodGet)
subRouter.HandleFunc("/tail", ViewAccess(aH.tailLogs)).Methods(http.MethodGet)
subRouter.HandleFunc("/fields", ViewAccess(aH.logFields)).Methods(http.MethodGet)
subRouter.HandleFunc("/fields", ViewAccess(aH.logFieldUpdate)).Methods(http.MethodPost)
}
@ -1866,13 +1868,6 @@ func (aH *APIHandler) getLogs(w http.ResponseWriter, r *http.Request) {
return
}
err = logs.ValidateFilters(&params.Filters)
if err != nil {
apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err}
respondError(w, apiErr, "Incorrect filter")
return
}
res, apiErr := (*aH.reader).GetLogs(r.Context(), params)
if apiErr != nil {
respondError(w, apiErr, "Failed to fetch logs from the DB")
@ -1880,3 +1875,37 @@ func (aH *APIHandler) getLogs(w http.ResponseWriter, r *http.Request) {
}
aH.writeJSON(w, r, map[string]interface{}{"results": res})
}
func (aH *APIHandler) tailLogs(w http.ResponseWriter, r *http.Request) {
client := &model.LogsTailClient{Name: r.RemoteAddr, Logs: make(chan *string, 100), Done: make(chan *bool)}
go (*aH.reader).TailLogs(r.Context(), client)
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.WriteHeader(200)
flusher, ok := w.(http.Flusher)
if !ok {
err := model.ApiError{Typ: model.ErrorStreamingNotSupported, Err: nil}
respondError(w, &err, "streaming is not supported")
return
}
loop := true
for loop {
select {
case ev := <-client.Logs:
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
enc.Encode(ev)
fmt.Fprintf(w, "data: %v\n\n", buf.String())
fmt.Printf("data: %v\n", buf.String())
flusher.Flush()
case <-client.Done:
fmt.Println("done!")
return
}
}
}

View File

@ -1,9 +1,9 @@
package logs
import (
"encoding/json"
"fmt"
"net/http"
"regexp"
"strconv"
"strings"
@ -11,24 +11,18 @@ import (
)
var operatorMapping = map[string]string{
"eq": "=",
"neq": "!=",
"lt": "<",
"gt": ">",
"lte": "<=",
"gte": ">=",
"in": "in",
"like": "like",
"ilike": "ilike",
"lt": "<",
"gt": ">",
"lte": "<=",
"gte": ">=",
"in": "IN",
"nin": "NOT IN",
"contains": "ILIKE",
"ncontains": "NOT ILIKE",
}
func arrayToMap(fields []model.LogField) map[string]model.LogField {
res := map[string]model.LogField{}
for _, field := range fields {
res[field.Name] = field
}
return res
}
var tokenRegex, _ = regexp.Compile(`(?i)(and( )*?)?(([\w.-]+ (in|nin) \(["\w.,' \-><]+\))|([\w.-]+ (gt|lt|gte|lte|contains|ncontains) ("|')?\S+("|')?))`)
var operatorRegex, _ = regexp.Compile(`(?i)(?: )(in|nin|gt|lt|gte|lte|contains|ncontains)(?: )`)
func ParseFilterParams(r *http.Request) (*model.LogsFilterParams, error) {
res := model.LogsFilterParams{
@ -38,7 +32,6 @@ func ParseFilterParams(r *http.Request) (*model.LogsFilterParams, error) {
}
var err error
params := r.URL.Query()
filters := []model.LogFilter{}
if val, ok := params["limit"]; ok {
res.Limit, err = strconv.Atoi(val[0])
if err != nil {
@ -51,50 +44,144 @@ func ParseFilterParams(r *http.Request) (*model.LogsFilterParams, error) {
if val, ok := params["order"]; ok {
res.Order = val[0]
}
if val, ok := params["filter"]; ok {
err := json.Unmarshal([]byte(val[0]), &filters)
if val, ok := params["q"]; ok {
res.Query = &val[0]
}
if val, ok := params["timestampStart"]; ok {
ts, err := strconv.Atoi(val[0])
if err != nil {
return nil, err
}
ts64 := int64(ts)
res.TimestampStart = &ts64
}
if val, ok := params["timestampEnd"]; ok {
ts, err := strconv.Atoi(val[0])
if err != nil {
return nil, err
}
ts64 := int64(ts)
res.TimestampEnd = &ts64
}
if val, ok := params["idStart"]; ok {
res.IdStart = &val[0]
}
if val, ok := params["idEnd"]; ok {
res.IdEnd = &val[0]
}
res.Filters = filters
return &res, nil
}
func ParseLogFilter(allFields *model.GetFieldsResponse, filters *[]model.LogFilter) (*string, error) {
fLen := len(*filters)
if fLen <= 0 {
return nil, nil
func parseLogQuery(query string) ([]string, error) {
sqlQueryTokens := []string{}
filterTokens := tokenRegex.FindAllString(query, -1)
if len(filterTokens) == 0 {
sqlQueryTokens = append(sqlQueryTokens, fmt.Sprintf("body ILIKE '%%%s%%' ", query))
return sqlQueryTokens, nil
}
// replace and check if there is something that is lying around
if len(strings.TrimSpace(tokenRegex.ReplaceAllString(query, ""))) > 0 {
return nil, fmt.Errorf("failed to parse query, contains unknown tokens")
}
for _, v := range filterTokens {
op := strings.TrimSpace(operatorRegex.FindString(v))
if strings.ToLower(op) == "contains" {
searchString := strings.TrimSpace(strings.Split(v, op)[1])
sqlQueryTokens = append(sqlQueryTokens, fmt.Sprintf(`AND body ILIKE '%%%s%%' `, searchString[1:len(searchString)-1]))
} else if strings.ToLower(op) == "ncontains" {
searchString := strings.TrimSpace(strings.Split(v, op)[1])
sqlQueryTokens = append(sqlQueryTokens, fmt.Sprintf(`AND body NOT ILIKE '%%%s%%' `, searchString[1:len(searchString)-1]))
} else {
symbol := operatorMapping[strings.ToLower(op)]
sqlQueryTokens = append(sqlQueryTokens, strings.Replace(v, " "+op+" ", " "+symbol+" ", 1)+" ")
}
}
return sqlQueryTokens, nil
}
func parseColumn(s string) (*string, error) {
s = strings.ToLower(s)
colName := ""
// if has and/or as prefix
filter := strings.Split(s, " ")
if len(filter) < 3 {
return nil, fmt.Errorf("incorrect filter")
}
if strings.HasPrefix(s, "and") {
colName = filter[1]
} else {
colName = filter[0]
}
return &colName, nil
}
func arrayToMap(fields []model.LogField) map[string]model.LogField {
res := map[string]model.LogField{}
for _, field := range fields {
res[field.Name] = field
}
return res
}
func replaceInterestingFields(allFields *model.GetFieldsResponse, queryTokens []string) ([]string, error) {
// check if cols
selectedFieldsLookup := arrayToMap(allFields.Selected)
interestingFieldLookup := arrayToMap(allFields.Interesting)
filterSql := ""
for fIndx := 0; fIndx < fLen; fIndx++ {
filter := (*filters)[fIndx]
fieldSQLName := filter.Column
if _, ok := selectedFieldsLookup[filter.Column]; !ok {
if field, ok := interestingFieldLookup[filter.Column]; ok {
fieldSQLName = fmt.Sprintf("%s_%s_value[indexOf(%s_%s_key, '%s')]", field.Type, strings.ToLower(field.DataType), field.Type, strings.ToLower(field.DataType), filter.Column)
for index := 0; index < len(queryTokens); index++ {
queryToken := queryTokens[index]
col, err := parseColumn(queryToken)
if err != nil {
return nil, err
}
sqlColName := *col
if _, ok := selectedFieldsLookup[*col]; !ok && *col != "body" {
if field, ok := interestingFieldLookup[*col]; ok {
sqlColName = fmt.Sprintf("%s_%s_value[indexOf(%s_%s_key, '%s')]", field.Type, strings.ToLower(field.DataType), field.Type, strings.ToLower(field.DataType), *col)
} else {
return nil, fmt.Errorf("field not found for filtering")
}
}
queryTokens[index] = strings.Replace(queryToken, *col, sqlColName, 1)
}
return queryTokens, nil
}
filterSql += "("
vLen := len(filter.Value)
for i := 0; i < vLen; i++ {
filterSql += fmt.Sprintf("%s%s'%v'", fieldSQLName, operatorMapping[filter.Operation], filter.Value[i])
if i != vLen-1 {
filterSql += " or "
}
}
filterSql += ")"
if fIndx != fLen-1 {
filterSql += " and "
}
func GenerateSQLWhere(allFields *model.GetFieldsResponse, params *model.LogsFilterParams) (*string, error) {
tokens, err := parseLogQuery(*params.Query)
if err != nil {
return nil, err
}
return &filterSql, nil
tokens, err = replaceInterestingFields(allFields, tokens)
if err != nil {
return nil, err
}
if params.TimestampStart != nil {
tokens = append(tokens, fmt.Sprintf("and timestamp >= '%d' ", *params.TimestampStart))
}
if params.TimestampEnd != nil {
tokens = append(tokens, fmt.Sprintf("and timestamp <= '%d' ", *params.TimestampEnd))
}
if params.IdStart != nil {
tokens = append(tokens, fmt.Sprintf("and id > '%v' ", *params.IdStart))
}
if params.IdEnd != nil {
tokens = append(tokens, fmt.Sprintf("and id < '%v' ", *params.IdEnd))
}
sqlWhere := strings.Join(tokens, "")
return &sqlWhere, nil
}

View File

@ -0,0 +1,209 @@
package logs
import (
"testing"
. "github.com/smartystreets/goconvey/convey"
"go.signoz.io/query-service/model"
)
var correctQueriesTest = []struct {
Name string
InputQuery string
WantSqlTokens []string
}{
{
`filter with fulltext`,
`OPERATION in ('bcd') AND FULLTEXT contains 'searchstring'`,
[]string{`OPERATION IN ('bcd') `, `AND body ILIKE '%searchstring%' `},
},
{
`fulltext`,
`searchstring`,
[]string{`body ILIKE '%searchstring%' `},
},
{
`filters with lt,gt,lte,gte operators`,
`id lt 100 and id gt 50 and code lte 500 and code gte 400`,
[]string{`id < 100 `, `and id > 50 `, `and code <= 500 `, `and code >= 400 `},
},
{
`filter with number`,
`status gte 200 AND FULLTEXT ncontains '"key"'`,
[]string{`status >= 200 `, `AND body NOT ILIKE '%"key"%' `},
},
{
`characters inside string`,
`service NIN ('name > 100') AND length gt 100`,
[]string{`service NOT IN ('name > 100') `, `AND length > 100 `},
},
{
`fulltext with in`,
`key in 2`,
[]string{`body ILIKE '%key in 2%' `},
},
{
`not valid fulltext but a filter`,
`key in (2,3)`,
[]string{`key IN (2,3) `},
},
{
`filters with extra spaces`,
`service IN ('name > 100') AND length gt 100`,
[]string{`service IN ('name > 100') `, `AND length > 100 `},
},
{
`filters with special characters in key name`,
`id.userid in (100) and id_userid gt 50`,
[]string{`id.userid IN (100) `, `and id_userid > 50 `},
},
}
func TestParseLogQueryCorrect(t *testing.T) {
for _, test := range correctQueriesTest {
Convey(test.Name, t, func() {
query, _ := parseLogQuery(test.InputQuery)
So(query, ShouldResemble, test.WantSqlTokens)
})
}
}
var incorrectQueries = []struct {
Name string
Query string
}{
{
"filter without a key",
"OPERATION in ('bcd') AND 'abcd' FULLTEXT contains 'helloxyz'",
},
{
"fulltext without fulltext keyword",
"OPERATION in ('bcd') AND 'searchstring'",
},
{
"fulltext in the beginning without keyword",
"'searchstring and OPERATION in ('bcd')",
},
}
func TestParseLogQueryInCorrect(t *testing.T) {
for _, test := range incorrectQueries {
Convey(test.Name, t, func() {
_, err := parseLogQuery(test.Query)
So(err, ShouldBeError)
})
}
}
var parseCorrectColumns = []struct {
Name string
Filter string
Column string
}{
{
"column with IN operator",
"id.userid IN (100) ",
"id.userid",
},
{
"column with NOT IN operator",
"service NOT IN ('name > 100') ",
"service",
},
{
"column with > operator",
"and id_userid > 50 ",
"id_userid",
},
{
"column with < operator",
"and id_userid < 50 ",
"id_userid",
},
{
"column with <= operator",
"and id_userid <= 50 ",
"id_userid",
},
{
"column with >= operator",
"and id_userid >= 50 ",
"id_userid",
},
{
"column with ilike",
`AND body ILIKE '%searchstring%' `,
"body",
},
{
"column with not ilike",
`AND body ILIKE '%searchstring%' `,
"body",
},
}
func TestParseColumn(t *testing.T) {
for _, test := range parseCorrectColumns {
Convey(test.Name, t, func() {
column, _ := parseColumn(test.Filter)
So(*column, ShouldEqual, test.Column)
})
}
}
func TestReplaceInterestingFields(t *testing.T) {
queryTokens := []string{"id.userid IN (100) ", "and id_key >= 50 ", `AND body ILIKE '%searchstring%'`}
allFields := model.GetFieldsResponse{
Selected: []model.LogField{
model.LogField{
Name: "id_key",
DataType: "int64",
Type: "attributes",
},
},
Interesting: []model.LogField{
model.LogField{
Name: "id.userid",
DataType: "int64",
Type: "attributes",
},
},
}
expectedTokens := []string{"attributes_int64_value[indexOf(attributes_int64_key, 'id.userid')] IN (100) ", "and id_key >= 50 ", `AND body ILIKE '%searchstring%'`}
Convey("testInterestingFields", t, func() {
tokens, _ := replaceInterestingFields(&allFields, queryTokens)
So(tokens, ShouldResemble, expectedTokens)
})
}
func TestGenerateSQLQuery(t *testing.T) {
allFields := model.GetFieldsResponse{
Selected: []model.LogField{
{
Name: "id",
DataType: "int64",
Type: "attributes",
},
},
Interesting: []model.LogField{
{
Name: "code",
DataType: "int64",
Type: "attributes",
},
},
}
query := "id lt 100 and id gt 50 and code lte 500 and code gte 400"
tsStart := int64(1657689292000)
tsEnd := int64(1657689294000)
idStart := "2BsKLKv8cZrLCn6rkOcRGkdjBdM"
idEnd := "2BsKG6tRpFWjYMcWsAGKfSxoQdU"
sqlWhere := "id < 100 and id > 50 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 and timestamp >= '1657689292000' and timestamp <= '1657689294000' and id > '2BsKLKv8cZrLCn6rkOcRGkdjBdM' and id < '2BsKG6tRpFWjYMcWsAGKfSxoQdU' "
Convey("testInterestingFields", t, func() {
res, _ := GenerateSQLWhere(&allFields, &model.LogsFilterParams{Query: &query, TimestampStart: &tsStart, TimestampEnd: &tsEnd, IdStart: &idStart, IdEnd: &idEnd})
So(*res, ShouldEqual, sqlWhere)
})
}

View File

@ -38,28 +38,3 @@ func ValidateUpdateFieldPayload(field *model.UpdateField) error {
}
return nil
}
func ValidateFilters(filters *[]model.LogFilter) error {
opsRegex := "^(gte|lte|gt|lt|eq|neq|in|like|ilike)$"
regex, err := regexp.Compile(opsRegex)
if err != nil {
return err
}
for _, val := range *filters {
if val.Column == "" {
return fmt.Errorf("col cannot be empty")
}
if val.Operation == "" {
return fmt.Errorf("op cannot be empty")
}
if len(val.Value) == 0 {
return fmt.Errorf("val cannot be empty")
}
matched := regex.MatchString(val.Operation)
if !matched {
return fmt.Errorf("op type %s not supported", val.Operation)
}
}
return nil
}

View File

@ -201,6 +201,11 @@ func (lrw *loggingResponseWriter) WriteHeader(code int) {
lrw.ResponseWriter.WriteHeader(code)
}
// Flush implements the http.Flush interface.
func (lrw *loggingResponseWriter) Flush() {
lrw.ResponseWriter.(http.Flusher).Flush()
}
func (s *Server) analyticsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
route := mux.CurrentRoute(r)

View File

@ -64,4 +64,5 @@ type Reader interface {
GetLogFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError)
UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError
GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.GetLogsResponse, *model.ApiError)
TailLogs(ctx context.Context, client *model.LogsTailClient) *model.ApiError
}

View File

@ -313,15 +313,13 @@ type UpdateField struct {
IndexGranularity *int `json:"indexGranularity"`
}
type LogFilter struct {
Column string `json:"col"`
Operation string `json:"op"`
Value []interface{} `json:"val"`
}
type LogsFilterParams struct {
Limit int `json:"limit"`
OrderBy string `json:"orderBy"`
Order string `json:"order"`
Filters []LogFilter `json:"filters"`
Limit int `json:"limit"`
OrderBy string `json:"orderBy"`
Order string `json:"order"`
Query *string `json:"q"`
TimestampStart *int64 `json:"timestampStart"`
TimestampEnd *int64 `json:"timestampEnd"`
IdStart *string `json:"idStart"`
IdEnd *string `json:"idEnd"`
}

View File

@ -18,18 +18,19 @@ type ApiError struct {
type ErrorType string
const (
ErrorNone ErrorType = ""
ErrorTimeout ErrorType = "timeout"
ErrorCanceled ErrorType = "canceled"
ErrorExec ErrorType = "execution"
ErrorBadData ErrorType = "bad_data"
ErrorInternal ErrorType = "internal"
ErrorUnavailable ErrorType = "unavailable"
ErrorNotFound ErrorType = "not_found"
ErrorNotImplemented ErrorType = "not_implemented"
ErrorUnauthorized ErrorType = "unauthorized"
ErrorForbidden ErrorType = "forbidden"
ErrorConflict ErrorType = "conflict"
ErrorNone ErrorType = ""
ErrorTimeout ErrorType = "timeout"
ErrorCanceled ErrorType = "canceled"
ErrorExec ErrorType = "execution"
ErrorBadData ErrorType = "bad_data"
ErrorInternal ErrorType = "internal"
ErrorUnavailable ErrorType = "unavailable"
ErrorNotFound ErrorType = "not_found"
ErrorNotImplemented ErrorType = "not_implemented"
ErrorUnauthorized ErrorType = "unauthorized"
ErrorForbidden ErrorType = "forbidden"
ErrorConflict ErrorType = "conflict"
ErrorStreamingNotSupported ErrorType = "streaming is not supported"
)
type QueryDataV2 struct {
@ -404,3 +405,9 @@ type GetLogsResponse struct {
Attributes_int64 map[string]int64 `json:"attributesInt" ch:"attributes_int64"`
Attributes_float64 map[string]float64 `json:"attributesFloat" ch:"attributes_float64"`
}
type LogsTailClient struct {
Name string
Logs chan *string
Done chan *bool
}