From 2450fff34d27a27749149e4297ae3932df1c3ab8 Mon Sep 17 00:00:00 2001 From: nityanandagohain Date: Mon, 18 Jul 2022 18:55:52 +0530 Subject: [PATCH] live tail v1 --- .../app/clickhouseReader/reader.go | 88 ++++++++++++++++--- pkg/query-service/app/http_handler.go | 15 +++- pkg/query-service/app/logs/parser.go | 36 ++++++-- pkg/query-service/interfaces/interface.go | 2 +- pkg/query-service/model/queryParams.go | 4 +- pkg/query-service/model/response.go | 8 +- 6 files changed, 128 insertions(+), 25 deletions(-) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 4060b08eed..0c8581e8f6 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -2883,20 +2883,86 @@ func (r *ClickHouseReader) GetLogs(ctx context.Context, params *model.LogsFilter return response, nil } -func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailClient) *model.ApiError { - for i := 0; i < 10; i++ { +func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailClient) { + response := &[]model.GetLogsResponse{} + fields, apiErr := r.GetLogFields(ctx) + if apiErr != nil { + client.Error <- apiErr.Err + return + } + + filterSql, err := logs.GenerateSQLWhere(fields, &model.LogsFilterParams{ + Query: client.Filter.Query, + }) + + if err != nil { + client.Error <- err + return + } + + query := fmt.Sprintf("SELECT "+ + "timestamp, observed_timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body,"+ + "CAST((attributes_string_key, attributes_string_value), 'Map(String, String)') as attributes_string,"+ + "CAST((attributes_int64_key, attributes_int64_value), 'Map(String, Int64)') as attributes_int64,"+ + "CAST((attributes_float64_key, attributes_float64_value), 'Map(String, Float64)') as attributes_float64,"+ + "CAST((resources_string_key, resources_string_value), 'Map(String, String)') as resources_string "+ + "from %s.%s", r.logsDB, r.logsTable) + + currentTime := uint64(time.Now().UnixNano() / int64(time.Millisecond)) + tsStart := ¤tTime + if client.Filter.TimestampStart != nil { + tsStart = client.Filter.TimestampStart + } + + var idStart *string + if client.Filter.IdStart != nil { + idStart = client.Filter.IdStart + } + + for { select { case <-ctx.Done(): - return nil + done := true + client.Done <- &done + zap.S().Debug("closing go routine : " + client.Name) + return default: - data := fmt.Sprintf("hello log %d", i) - client.Logs <- &data - time.Sleep(time.Second) + tmpQuery := fmt.Sprintf("%s where timestamp >='%d'", query, *tsStart) + if filterSql != nil && *filterSql != "" { + tmpQuery += fmt.Sprintf(" and %s", *filterSql) + } + if idStart != nil { + tmpQuery += fmt.Sprintf(" and id > '%s'", *idStart) + } + tmpQuery = fmt.Sprintf("%s order by timestamp asc limit 1000", tmpQuery) + zap.S().Debug(tmpQuery) + err := r.db.Select(ctx, response, tmpQuery) + if err != nil { + zap.S().Debug(err) + client.Error <- err + return + } + len := len(*response) + for i := 0; i < len; i++ { + select { + case <-ctx.Done(): + done := true + client.Done <- &done + zap.S().Debug("closing go routine while sending logs : " + client.Name) + return + default: + client.Logs <- &(*response)[i] + if i == len-1 { + tsStart = &(*response)[i].Timestamp + idStart = &(*response)[i].ID + } + } + } + if len == 0 { + currentTime := uint64(time.Now().UnixNano() / int64(time.Millisecond)) + tsStart = ¤tTime + } + time.Sleep(2 * time.Second) } } - done := true - client.Done <- &done - fmt.Println("done in the tail logs") - - return nil } diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 8e5233d744..32113d801a 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -1903,7 +1903,7 @@ func (aH *APIHandler) logFieldUpdate(w http.ResponseWriter, r *http.Request) { } func (aH *APIHandler) getLogs(w http.ResponseWriter, r *http.Request) { - params, err := logs.ParseFilterParams(r) + params, err := logs.ParseLogFilterParams(r) if err != nil { apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err} respondError(w, apiErr, "Incorrect params") @@ -1919,7 +1919,15 @@ func (aH *APIHandler) getLogs(w http.ResponseWriter, r *http.Request) { } func (aH *APIHandler) tailLogs(w http.ResponseWriter, r *http.Request) { - client := &model.LogsTailClient{Name: r.RemoteAddr, Logs: make(chan *string, 100), Done: make(chan *bool)} + params, err := logs.ParseLogFilterParams(r) + if err != nil { + apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err} + respondError(w, apiErr, "Incorrect params") + return + } + + // create the client + client := &model.LogsTailClient{Name: r.RemoteAddr, Logs: make(chan *model.GetLogsResponse, 1000), Done: make(chan *bool), Error: make(chan error), Filter: *params} go (*aH.reader).TailLogs(r.Context(), client) w.Header().Set("Connection", "keep-alive") @@ -1948,6 +1956,9 @@ func (aH *APIHandler) tailLogs(w http.ResponseWriter, r *http.Request) { case <-client.Done: fmt.Println("done!") return + case <-client.Error: + fmt.Println("error occured!") + return } } } diff --git a/pkg/query-service/app/logs/parser.go b/pkg/query-service/app/logs/parser.go index 61d1d2840b..1bbab36466 100644 --- a/pkg/query-service/app/logs/parser.go +++ b/pkg/query-service/app/logs/parser.go @@ -24,7 +24,7 @@ var operatorMapping = map[string]string{ var tokenRegex, _ = regexp.Compile(`(?i)(and( )*?)?(([\w.-]+ (in|nin) \(["\w.,' \-><]+\))|([\w.-]+ (gt|lt|gte|lte|contains|ncontains) ("|')?\S+("|')?))`) var operatorRegex, _ = regexp.Compile(`(?i)(?: )(in|nin|gt|lt|gte|lte|contains|ncontains)(?: )`) -func ParseFilterParams(r *http.Request) (*model.LogsFilterParams, error) { +func ParseLogFilterParams(r *http.Request) (*model.LogsFilterParams, error) { res := model.LogsFilterParams{ Limit: 30, OrderBy: "timestamp", @@ -52,7 +52,7 @@ func ParseFilterParams(r *http.Request) (*model.LogsFilterParams, error) { if err != nil { return nil, err } - ts64 := int64(ts) + ts64 := uint64(ts) res.TimestampStart = &ts64 } if val, ok := params["timestampEnd"]; ok { @@ -60,7 +60,7 @@ func ParseFilterParams(r *http.Request) (*model.LogsFilterParams, error) { if err != nil { return nil, err } - ts64 := int64(ts) + ts64 := uint64(ts) res.TimestampEnd = &ts64 } if val, ok := params["idStart"]; ok { @@ -72,6 +72,26 @@ func ParseFilterParams(r *http.Request) (*model.LogsFilterParams, error) { return &res, nil } +func ParseLiveTailFilterParams(r *http.Request) (*model.LogsFilterParams, error) { + res := model.LogsFilterParams{} + params := r.URL.Query() + if val, ok := params["q"]; ok { + res.Query = &val[0] + } + if val, ok := params["timestampStart"]; ok { + ts, err := strconv.Atoi(val[0]) + if err != nil { + return nil, err + } + ts64 := uint64(ts) + res.TimestampStart = &ts64 + } + if val, ok := params["idStart"]; ok { + res.IdStart = &val[0] + } + return &res, nil +} + func parseLogQuery(query string) ([]string, error) { sqlQueryTokens := []string{} filterTokens := tokenRegex.FindAllString(query, -1) @@ -158,9 +178,13 @@ func replaceInterestingFields(allFields *model.GetFieldsResponse, queryTokens [] } func GenerateSQLWhere(allFields *model.GetFieldsResponse, params *model.LogsFilterParams) (*string, error) { - tokens, err := parseLogQuery(*params.Query) - if err != nil { - return nil, err + var tokens []string + var err error + if params.Query != nil { + tokens, err = parseLogQuery(*params.Query) + if err != nil { + return nil, err + } } tokens, err = replaceInterestingFields(allFields, tokens) diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 8519b09b04..b0429db9bb 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -62,7 +62,7 @@ type Reader interface { GetLogFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError) UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.GetLogsResponse, *model.ApiError) - TailLogs(ctx context.Context, client *model.LogsTailClient) *model.ApiError + TailLogs(ctx context.Context, client *model.LogsTailClient) // Connection needed for rules, not ideal but required GetConn() clickhouse.Conn diff --git a/pkg/query-service/model/queryParams.go b/pkg/query-service/model/queryParams.go index d63a07eb71..b6059107d9 100644 --- a/pkg/query-service/model/queryParams.go +++ b/pkg/query-service/model/queryParams.go @@ -335,8 +335,8 @@ type LogsFilterParams struct { OrderBy string `json:"orderBy"` Order string `json:"order"` Query *string `json:"q"` - TimestampStart *int64 `json:"timestampStart"` - TimestampEnd *int64 `json:"timestampEnd"` + TimestampStart *uint64 `json:"timestampStart"` + TimestampEnd *uint64 `json:"timestampEnd"` IdStart *string `json:"idStart"` IdEnd *string `json:"idEnd"` } diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index 8efb4f1069..b85955da74 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -437,7 +437,9 @@ type GetLogsResponse struct { } type LogsTailClient struct { - Name string - Logs chan *string - Done chan *bool + Name string + Logs chan *GetLogsResponse + Done chan *bool + Error chan error + Filter LogsFilterParams }