diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index ac41d049b0..687fcb3e46 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -3088,7 +3088,7 @@ func (r *ClickHouseReader) GetLogs(ctx context.Context, params *model.LogsFilter return nil, apiErr } - filterSql, err := logs.ParseLogFilter(fields, ¶ms.Filters) + filterSql, err := logs.GenerateSQLWhere(fields, params) if err != nil { return nil, &model.ApiError{Err: err, Typ: model.ErrorBadData} } @@ -3114,3 +3114,21 @@ func (r *ClickHouseReader) GetLogs(ctx context.Context, params *model.LogsFilter return response, nil } + +func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailClient) *model.ApiError { + for i := 0; i < 10; i++ { + select { + case <-ctx.Done(): + return nil + default: + data := fmt.Sprintf("hello log %d", i) + client.Logs <- &data + time.Sleep(time.Second) + } + } + done := true + client.Done <- &done + fmt.Println("done in the tail logs") + + return nil +} diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index c4ebeccaa9..92caf4ed69 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -1,6 +1,7 @@ package app import ( + "bytes" "context" "encoding/json" "errors" @@ -1822,6 +1823,7 @@ func (aH *APIHandler) writeJSON(w http.ResponseWriter, r *http.Request, response func (aH *APIHandler) RegisterLogsRoutes(router *mux.Router) { subRouter := router.PathPrefix("/api/v1/logs").Subrouter() subRouter.HandleFunc("", ViewAccess(aH.getLogs)).Methods(http.MethodGet) + subRouter.HandleFunc("/tail", ViewAccess(aH.tailLogs)).Methods(http.MethodGet) subRouter.HandleFunc("/fields", ViewAccess(aH.logFields)).Methods(http.MethodGet) subRouter.HandleFunc("/fields", ViewAccess(aH.logFieldUpdate)).Methods(http.MethodPost) } @@ -1866,13 +1868,6 @@ func (aH *APIHandler) getLogs(w http.ResponseWriter, r *http.Request) { return } - err = logs.ValidateFilters(¶ms.Filters) - if err != nil { - apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err} - respondError(w, apiErr, "Incorrect filter") - return - } - res, apiErr := (*aH.reader).GetLogs(r.Context(), params) if apiErr != nil { respondError(w, apiErr, "Failed to fetch logs from the DB") @@ -1880,3 +1875,37 @@ func (aH *APIHandler) getLogs(w http.ResponseWriter, r *http.Request) { } aH.writeJSON(w, r, map[string]interface{}{"results": res}) } + +func (aH *APIHandler) tailLogs(w http.ResponseWriter, r *http.Request) { + client := &model.LogsTailClient{Name: r.RemoteAddr, Logs: make(chan *string, 100), Done: make(chan *bool)} + go (*aH.reader).TailLogs(r.Context(), client) + + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Access-Control-Allow-Origin", "*") + w.WriteHeader(200) + + flusher, ok := w.(http.Flusher) + if !ok { + err := model.ApiError{Typ: model.ErrorStreamingNotSupported, Err: nil} + respondError(w, &err, "streaming is not supported") + return + } + + loop := true + for loop { + select { + case ev := <-client.Logs: + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + enc.Encode(ev) + fmt.Fprintf(w, "data: %v\n\n", buf.String()) + fmt.Printf("data: %v\n", buf.String()) + flusher.Flush() + case <-client.Done: + fmt.Println("done!") + return + } + } +} diff --git a/pkg/query-service/app/logs/parser.go b/pkg/query-service/app/logs/parser.go index a7975cdd8d..61d1d2840b 100644 --- a/pkg/query-service/app/logs/parser.go +++ b/pkg/query-service/app/logs/parser.go @@ -1,9 +1,9 @@ package logs import ( - "encoding/json" "fmt" "net/http" + "regexp" "strconv" "strings" @@ -11,24 +11,18 @@ import ( ) var operatorMapping = map[string]string{ - "eq": "=", - "neq": "!=", - "lt": "<", - "gt": ">", - "lte": "<=", - "gte": ">=", - "in": "in", - "like": "like", - "ilike": "ilike", + "lt": "<", + "gt": ">", + "lte": "<=", + "gte": ">=", + "in": "IN", + "nin": "NOT IN", + "contains": "ILIKE", + "ncontains": "NOT ILIKE", } -func arrayToMap(fields []model.LogField) map[string]model.LogField { - res := map[string]model.LogField{} - for _, field := range fields { - res[field.Name] = field - } - return res -} +var tokenRegex, _ = regexp.Compile(`(?i)(and( )*?)?(([\w.-]+ (in|nin) \(["\w.,' \-><]+\))|([\w.-]+ (gt|lt|gte|lte|contains|ncontains) ("|')?\S+("|')?))`) +var operatorRegex, _ = regexp.Compile(`(?i)(?: )(in|nin|gt|lt|gte|lte|contains|ncontains)(?: )`) func ParseFilterParams(r *http.Request) (*model.LogsFilterParams, error) { res := model.LogsFilterParams{ @@ -38,7 +32,6 @@ func ParseFilterParams(r *http.Request) (*model.LogsFilterParams, error) { } var err error params := r.URL.Query() - filters := []model.LogFilter{} if val, ok := params["limit"]; ok { res.Limit, err = strconv.Atoi(val[0]) if err != nil { @@ -51,50 +44,144 @@ func ParseFilterParams(r *http.Request) (*model.LogsFilterParams, error) { if val, ok := params["order"]; ok { res.Order = val[0] } - if val, ok := params["filter"]; ok { - err := json.Unmarshal([]byte(val[0]), &filters) + if val, ok := params["q"]; ok { + res.Query = &val[0] + } + if val, ok := params["timestampStart"]; ok { + ts, err := strconv.Atoi(val[0]) if err != nil { return nil, err } + ts64 := int64(ts) + res.TimestampStart = &ts64 + } + if val, ok := params["timestampEnd"]; ok { + ts, err := strconv.Atoi(val[0]) + if err != nil { + return nil, err + } + ts64 := int64(ts) + res.TimestampEnd = &ts64 + } + if val, ok := params["idStart"]; ok { + res.IdStart = &val[0] + } + if val, ok := params["idEnd"]; ok { + res.IdEnd = &val[0] } - res.Filters = filters return &res, nil } -func ParseLogFilter(allFields *model.GetFieldsResponse, filters *[]model.LogFilter) (*string, error) { - fLen := len(*filters) - if fLen <= 0 { - return nil, nil +func parseLogQuery(query string) ([]string, error) { + sqlQueryTokens := []string{} + filterTokens := tokenRegex.FindAllString(query, -1) + + if len(filterTokens) == 0 { + sqlQueryTokens = append(sqlQueryTokens, fmt.Sprintf("body ILIKE '%%%s%%' ", query)) + return sqlQueryTokens, nil } + // replace and check if there is something that is lying around + if len(strings.TrimSpace(tokenRegex.ReplaceAllString(query, ""))) > 0 { + return nil, fmt.Errorf("failed to parse query, contains unknown tokens") + } + + for _, v := range filterTokens { + op := strings.TrimSpace(operatorRegex.FindString(v)) + + if strings.ToLower(op) == "contains" { + searchString := strings.TrimSpace(strings.Split(v, op)[1]) + sqlQueryTokens = append(sqlQueryTokens, fmt.Sprintf(`AND body ILIKE '%%%s%%' `, searchString[1:len(searchString)-1])) + } else if strings.ToLower(op) == "ncontains" { + searchString := strings.TrimSpace(strings.Split(v, op)[1]) + sqlQueryTokens = append(sqlQueryTokens, fmt.Sprintf(`AND body NOT ILIKE '%%%s%%' `, searchString[1:len(searchString)-1])) + } else { + symbol := operatorMapping[strings.ToLower(op)] + sqlQueryTokens = append(sqlQueryTokens, strings.Replace(v, " "+op+" ", " "+symbol+" ", 1)+" ") + } + } + + return sqlQueryTokens, nil +} + +func parseColumn(s string) (*string, error) { + s = strings.ToLower(s) + + colName := "" + + // if has and/or as prefix + filter := strings.Split(s, " ") + if len(filter) < 3 { + return nil, fmt.Errorf("incorrect filter") + } + + if strings.HasPrefix(s, "and") { + colName = filter[1] + } else { + colName = filter[0] + } + + return &colName, nil +} + +func arrayToMap(fields []model.LogField) map[string]model.LogField { + res := map[string]model.LogField{} + for _, field := range fields { + res[field.Name] = field + } + return res +} + +func replaceInterestingFields(allFields *model.GetFieldsResponse, queryTokens []string) ([]string, error) { + // check if cols selectedFieldsLookup := arrayToMap(allFields.Selected) interestingFieldLookup := arrayToMap(allFields.Interesting) - filterSql := "" - for fIndx := 0; fIndx < fLen; fIndx++ { - filter := (*filters)[fIndx] - fieldSQLName := filter.Column - if _, ok := selectedFieldsLookup[filter.Column]; !ok { - if field, ok := interestingFieldLookup[filter.Column]; ok { - fieldSQLName = fmt.Sprintf("%s_%s_value[indexOf(%s_%s_key, '%s')]", field.Type, strings.ToLower(field.DataType), field.Type, strings.ToLower(field.DataType), filter.Column) + + for index := 0; index < len(queryTokens); index++ { + queryToken := queryTokens[index] + col, err := parseColumn(queryToken) + if err != nil { + return nil, err + } + + sqlColName := *col + if _, ok := selectedFieldsLookup[*col]; !ok && *col != "body" { + if field, ok := interestingFieldLookup[*col]; ok { + sqlColName = fmt.Sprintf("%s_%s_value[indexOf(%s_%s_key, '%s')]", field.Type, strings.ToLower(field.DataType), field.Type, strings.ToLower(field.DataType), *col) } else { return nil, fmt.Errorf("field not found for filtering") } } + queryTokens[index] = strings.Replace(queryToken, *col, sqlColName, 1) + } + return queryTokens, nil +} - filterSql += "(" - vLen := len(filter.Value) - for i := 0; i < vLen; i++ { - filterSql += fmt.Sprintf("%s%s'%v'", fieldSQLName, operatorMapping[filter.Operation], filter.Value[i]) - if i != vLen-1 { - filterSql += " or " - } - } - filterSql += ")" - - if fIndx != fLen-1 { - filterSql += " and " - } +func GenerateSQLWhere(allFields *model.GetFieldsResponse, params *model.LogsFilterParams) (*string, error) { + tokens, err := parseLogQuery(*params.Query) + if err != nil { + return nil, err } - return &filterSql, nil + tokens, err = replaceInterestingFields(allFields, tokens) + if err != nil { + return nil, err + } + + if params.TimestampStart != nil { + tokens = append(tokens, fmt.Sprintf("and timestamp >= '%d' ", *params.TimestampStart)) + } + if params.TimestampEnd != nil { + tokens = append(tokens, fmt.Sprintf("and timestamp <= '%d' ", *params.TimestampEnd)) + } + if params.IdStart != nil { + tokens = append(tokens, fmt.Sprintf("and id > '%v' ", *params.IdStart)) + } + if params.IdEnd != nil { + tokens = append(tokens, fmt.Sprintf("and id < '%v' ", *params.IdEnd)) + } + + sqlWhere := strings.Join(tokens, "") + + return &sqlWhere, nil } diff --git a/pkg/query-service/app/logs/parser_test.go b/pkg/query-service/app/logs/parser_test.go new file mode 100644 index 0000000000..d1467b4ee3 --- /dev/null +++ b/pkg/query-service/app/logs/parser_test.go @@ -0,0 +1,209 @@ +package logs + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" + "go.signoz.io/query-service/model" +) + +var correctQueriesTest = []struct { + Name string + InputQuery string + WantSqlTokens []string +}{ + { + `filter with fulltext`, + `OPERATION in ('bcd') AND FULLTEXT contains 'searchstring'`, + []string{`OPERATION IN ('bcd') `, `AND body ILIKE '%searchstring%' `}, + }, + { + `fulltext`, + `searchstring`, + []string{`body ILIKE '%searchstring%' `}, + }, + { + `filters with lt,gt,lte,gte operators`, + `id lt 100 and id gt 50 and code lte 500 and code gte 400`, + []string{`id < 100 `, `and id > 50 `, `and code <= 500 `, `and code >= 400 `}, + }, + { + `filter with number`, + `status gte 200 AND FULLTEXT ncontains '"key"'`, + []string{`status >= 200 `, `AND body NOT ILIKE '%"key"%' `}, + }, + { + `characters inside string`, + `service NIN ('name > 100') AND length gt 100`, + []string{`service NOT IN ('name > 100') `, `AND length > 100 `}, + }, + { + `fulltext with in`, + `key in 2`, + []string{`body ILIKE '%key in 2%' `}, + }, + { + `not valid fulltext but a filter`, + `key in (2,3)`, + []string{`key IN (2,3) `}, + }, + { + `filters with extra spaces`, + `service IN ('name > 100') AND length gt 100`, + []string{`service IN ('name > 100') `, `AND length > 100 `}, + }, + { + `filters with special characters in key name`, + `id.userid in (100) and id_userid gt 50`, + []string{`id.userid IN (100) `, `and id_userid > 50 `}, + }, +} + +func TestParseLogQueryCorrect(t *testing.T) { + for _, test := range correctQueriesTest { + Convey(test.Name, t, func() { + query, _ := parseLogQuery(test.InputQuery) + + So(query, ShouldResemble, test.WantSqlTokens) + }) + } +} + +var incorrectQueries = []struct { + Name string + Query string +}{ + { + "filter without a key", + "OPERATION in ('bcd') AND 'abcd' FULLTEXT contains 'helloxyz'", + }, + { + "fulltext without fulltext keyword", + "OPERATION in ('bcd') AND 'searchstring'", + }, + { + "fulltext in the beginning without keyword", + "'searchstring and OPERATION in ('bcd')", + }, +} + +func TestParseLogQueryInCorrect(t *testing.T) { + for _, test := range incorrectQueries { + Convey(test.Name, t, func() { + _, err := parseLogQuery(test.Query) + So(err, ShouldBeError) + }) + } +} + +var parseCorrectColumns = []struct { + Name string + Filter string + Column string +}{ + { + "column with IN operator", + "id.userid IN (100) ", + "id.userid", + }, + { + "column with NOT IN operator", + "service NOT IN ('name > 100') ", + "service", + }, + { + "column with > operator", + "and id_userid > 50 ", + "id_userid", + }, + { + "column with < operator", + "and id_userid < 50 ", + "id_userid", + }, + { + "column with <= operator", + "and id_userid <= 50 ", + "id_userid", + }, + { + "column with >= operator", + "and id_userid >= 50 ", + "id_userid", + }, + { + "column with ilike", + `AND body ILIKE '%searchstring%' `, + "body", + }, + { + "column with not ilike", + `AND body ILIKE '%searchstring%' `, + "body", + }, +} + +func TestParseColumn(t *testing.T) { + for _, test := range parseCorrectColumns { + Convey(test.Name, t, func() { + column, _ := parseColumn(test.Filter) + So(*column, ShouldEqual, test.Column) + }) + } +} + +func TestReplaceInterestingFields(t *testing.T) { + queryTokens := []string{"id.userid IN (100) ", "and id_key >= 50 ", `AND body ILIKE '%searchstring%'`} + allFields := model.GetFieldsResponse{ + Selected: []model.LogField{ + model.LogField{ + Name: "id_key", + DataType: "int64", + Type: "attributes", + }, + }, + Interesting: []model.LogField{ + model.LogField{ + Name: "id.userid", + DataType: "int64", + Type: "attributes", + }, + }, + } + + expectedTokens := []string{"attributes_int64_value[indexOf(attributes_int64_key, 'id.userid')] IN (100) ", "and id_key >= 50 ", `AND body ILIKE '%searchstring%'`} + Convey("testInterestingFields", t, func() { + tokens, _ := replaceInterestingFields(&allFields, queryTokens) + So(tokens, ShouldResemble, expectedTokens) + }) +} + +func TestGenerateSQLQuery(t *testing.T) { + allFields := model.GetFieldsResponse{ + Selected: []model.LogField{ + { + Name: "id", + DataType: "int64", + Type: "attributes", + }, + }, + Interesting: []model.LogField{ + { + Name: "code", + DataType: "int64", + Type: "attributes", + }, + }, + } + + query := "id lt 100 and id gt 50 and code lte 500 and code gte 400" + tsStart := int64(1657689292000) + tsEnd := int64(1657689294000) + idStart := "2BsKLKv8cZrLCn6rkOcRGkdjBdM" + idEnd := "2BsKG6tRpFWjYMcWsAGKfSxoQdU" + sqlWhere := "id < 100 and id > 50 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 and timestamp >= '1657689292000' and timestamp <= '1657689294000' and id > '2BsKLKv8cZrLCn6rkOcRGkdjBdM' and id < '2BsKG6tRpFWjYMcWsAGKfSxoQdU' " + Convey("testInterestingFields", t, func() { + res, _ := GenerateSQLWhere(&allFields, &model.LogsFilterParams{Query: &query, TimestampStart: &tsStart, TimestampEnd: &tsEnd, IdStart: &idStart, IdEnd: &idEnd}) + So(*res, ShouldEqual, sqlWhere) + }) +} diff --git a/pkg/query-service/app/logs/validator.go b/pkg/query-service/app/logs/validator.go index 99858969e8..9277bda047 100644 --- a/pkg/query-service/app/logs/validator.go +++ b/pkg/query-service/app/logs/validator.go @@ -38,28 +38,3 @@ func ValidateUpdateFieldPayload(field *model.UpdateField) error { } return nil } - -func ValidateFilters(filters *[]model.LogFilter) error { - opsRegex := "^(gte|lte|gt|lt|eq|neq|in|like|ilike)$" - regex, err := regexp.Compile(opsRegex) - if err != nil { - return err - } - for _, val := range *filters { - if val.Column == "" { - return fmt.Errorf("col cannot be empty") - } - if val.Operation == "" { - return fmt.Errorf("op cannot be empty") - } - if len(val.Value) == 0 { - return fmt.Errorf("val cannot be empty") - } - - matched := regex.MatchString(val.Operation) - if !matched { - return fmt.Errorf("op type %s not supported", val.Operation) - } - } - return nil -} diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 2d23531c41..ea9a182786 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -201,6 +201,11 @@ func (lrw *loggingResponseWriter) WriteHeader(code int) { lrw.ResponseWriter.WriteHeader(code) } +// Flush implements the http.Flush interface. +func (lrw *loggingResponseWriter) Flush() { + lrw.ResponseWriter.(http.Flusher).Flush() +} + func (s *Server) analyticsMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { route := mux.CurrentRoute(r) diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index bb5a428a0e..c8d9b1cfcf 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -64,4 +64,5 @@ type Reader interface { GetLogFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError) UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.GetLogsResponse, *model.ApiError) + TailLogs(ctx context.Context, client *model.LogsTailClient) *model.ApiError } diff --git a/pkg/query-service/model/queryParams.go b/pkg/query-service/model/queryParams.go index c39c3eb802..1df7813679 100644 --- a/pkg/query-service/model/queryParams.go +++ b/pkg/query-service/model/queryParams.go @@ -313,15 +313,13 @@ type UpdateField struct { IndexGranularity *int `json:"indexGranularity"` } -type LogFilter struct { - Column string `json:"col"` - Operation string `json:"op"` - Value []interface{} `json:"val"` -} - type LogsFilterParams struct { - Limit int `json:"limit"` - OrderBy string `json:"orderBy"` - Order string `json:"order"` - Filters []LogFilter `json:"filters"` + Limit int `json:"limit"` + OrderBy string `json:"orderBy"` + Order string `json:"order"` + Query *string `json:"q"` + TimestampStart *int64 `json:"timestampStart"` + TimestampEnd *int64 `json:"timestampEnd"` + IdStart *string `json:"idStart"` + IdEnd *string `json:"idEnd"` } diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index f6120f34af..7f6020c114 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -18,18 +18,19 @@ type ApiError struct { type ErrorType string const ( - ErrorNone ErrorType = "" - ErrorTimeout ErrorType = "timeout" - ErrorCanceled ErrorType = "canceled" - ErrorExec ErrorType = "execution" - ErrorBadData ErrorType = "bad_data" - ErrorInternal ErrorType = "internal" - ErrorUnavailable ErrorType = "unavailable" - ErrorNotFound ErrorType = "not_found" - ErrorNotImplemented ErrorType = "not_implemented" - ErrorUnauthorized ErrorType = "unauthorized" - ErrorForbidden ErrorType = "forbidden" - ErrorConflict ErrorType = "conflict" + ErrorNone ErrorType = "" + ErrorTimeout ErrorType = "timeout" + ErrorCanceled ErrorType = "canceled" + ErrorExec ErrorType = "execution" + ErrorBadData ErrorType = "bad_data" + ErrorInternal ErrorType = "internal" + ErrorUnavailable ErrorType = "unavailable" + ErrorNotFound ErrorType = "not_found" + ErrorNotImplemented ErrorType = "not_implemented" + ErrorUnauthorized ErrorType = "unauthorized" + ErrorForbidden ErrorType = "forbidden" + ErrorConflict ErrorType = "conflict" + ErrorStreamingNotSupported ErrorType = "streaming is not supported" ) type QueryDataV2 struct { @@ -404,3 +405,9 @@ type GetLogsResponse struct { Attributes_int64 map[string]int64 `json:"attributesInt" ch:"attributes_int64"` Attributes_float64 map[string]float64 `json:"attributesFloat" ch:"attributes_float64"` } + +type LogsTailClient struct { + Name string + Logs chan *string + Done chan *bool +}