live tail v1

This commit is contained in:
nityanandagohain 2022-07-18 18:55:52 +05:30
parent df17d4ca54
commit 2450fff34d
6 changed files with 128 additions and 25 deletions

View File

@ -2883,20 +2883,86 @@ func (r *ClickHouseReader) GetLogs(ctx context.Context, params *model.LogsFilter
return response, nil
}
func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailClient) *model.ApiError {
for i := 0; i < 10; i++ {
func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailClient) {
response := &[]model.GetLogsResponse{}
fields, apiErr := r.GetLogFields(ctx)
if apiErr != nil {
client.Error <- apiErr.Err
return
}
filterSql, err := logs.GenerateSQLWhere(fields, &model.LogsFilterParams{
Query: client.Filter.Query,
})
if err != nil {
client.Error <- err
return
}
query := fmt.Sprintf("SELECT "+
"timestamp, observed_timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body,"+
"CAST((attributes_string_key, attributes_string_value), 'Map(String, String)') as attributes_string,"+
"CAST((attributes_int64_key, attributes_int64_value), 'Map(String, Int64)') as attributes_int64,"+
"CAST((attributes_float64_key, attributes_float64_value), 'Map(String, Float64)') as attributes_float64,"+
"CAST((resources_string_key, resources_string_value), 'Map(String, String)') as resources_string "+
"from %s.%s", r.logsDB, r.logsTable)
currentTime := uint64(time.Now().UnixNano() / int64(time.Millisecond))
tsStart := &currentTime
if client.Filter.TimestampStart != nil {
tsStart = client.Filter.TimestampStart
}
var idStart *string
if client.Filter.IdStart != nil {
idStart = client.Filter.IdStart
}
for {
select {
case <-ctx.Done():
return nil
done := true
client.Done <- &done
zap.S().Debug("closing go routine : " + client.Name)
return
default:
data := fmt.Sprintf("hello log %d", i)
client.Logs <- &data
time.Sleep(time.Second)
tmpQuery := fmt.Sprintf("%s where timestamp >='%d'", query, *tsStart)
if filterSql != nil && *filterSql != "" {
tmpQuery += fmt.Sprintf(" and %s", *filterSql)
}
if idStart != nil {
tmpQuery += fmt.Sprintf(" and id > '%s'", *idStart)
}
tmpQuery = fmt.Sprintf("%s order by timestamp asc limit 1000", tmpQuery)
zap.S().Debug(tmpQuery)
err := r.db.Select(ctx, response, tmpQuery)
if err != nil {
zap.S().Debug(err)
client.Error <- err
return
}
len := len(*response)
for i := 0; i < len; i++ {
select {
case <-ctx.Done():
done := true
client.Done <- &done
zap.S().Debug("closing go routine while sending logs : " + client.Name)
return
default:
client.Logs <- &(*response)[i]
if i == len-1 {
tsStart = &(*response)[i].Timestamp
idStart = &(*response)[i].ID
}
}
}
if len == 0 {
currentTime := uint64(time.Now().UnixNano() / int64(time.Millisecond))
tsStart = &currentTime
}
time.Sleep(2 * time.Second)
}
}
done := true
client.Done <- &done
fmt.Println("done in the tail logs")
return nil
}

View File

@ -1903,7 +1903,7 @@ func (aH *APIHandler) logFieldUpdate(w http.ResponseWriter, r *http.Request) {
}
func (aH *APIHandler) getLogs(w http.ResponseWriter, r *http.Request) {
params, err := logs.ParseFilterParams(r)
params, err := logs.ParseLogFilterParams(r)
if err != nil {
apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err}
respondError(w, apiErr, "Incorrect params")
@ -1919,7 +1919,15 @@ func (aH *APIHandler) getLogs(w http.ResponseWriter, r *http.Request) {
}
func (aH *APIHandler) tailLogs(w http.ResponseWriter, r *http.Request) {
client := &model.LogsTailClient{Name: r.RemoteAddr, Logs: make(chan *string, 100), Done: make(chan *bool)}
params, err := logs.ParseLogFilterParams(r)
if err != nil {
apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err}
respondError(w, apiErr, "Incorrect params")
return
}
// create the client
client := &model.LogsTailClient{Name: r.RemoteAddr, Logs: make(chan *model.GetLogsResponse, 1000), Done: make(chan *bool), Error: make(chan error), Filter: *params}
go (*aH.reader).TailLogs(r.Context(), client)
w.Header().Set("Connection", "keep-alive")
@ -1948,6 +1956,9 @@ func (aH *APIHandler) tailLogs(w http.ResponseWriter, r *http.Request) {
case <-client.Done:
fmt.Println("done!")
return
case <-client.Error:
fmt.Println("error occured!")
return
}
}
}

View File

@ -24,7 +24,7 @@ var operatorMapping = map[string]string{
var tokenRegex, _ = regexp.Compile(`(?i)(and( )*?)?(([\w.-]+ (in|nin) \(["\w.,' \-><]+\))|([\w.-]+ (gt|lt|gte|lte|contains|ncontains) ("|')?\S+("|')?))`)
var operatorRegex, _ = regexp.Compile(`(?i)(?: )(in|nin|gt|lt|gte|lte|contains|ncontains)(?: )`)
func ParseFilterParams(r *http.Request) (*model.LogsFilterParams, error) {
func ParseLogFilterParams(r *http.Request) (*model.LogsFilterParams, error) {
res := model.LogsFilterParams{
Limit: 30,
OrderBy: "timestamp",
@ -52,7 +52,7 @@ func ParseFilterParams(r *http.Request) (*model.LogsFilterParams, error) {
if err != nil {
return nil, err
}
ts64 := int64(ts)
ts64 := uint64(ts)
res.TimestampStart = &ts64
}
if val, ok := params["timestampEnd"]; ok {
@ -60,7 +60,7 @@ func ParseFilterParams(r *http.Request) (*model.LogsFilterParams, error) {
if err != nil {
return nil, err
}
ts64 := int64(ts)
ts64 := uint64(ts)
res.TimestampEnd = &ts64
}
if val, ok := params["idStart"]; ok {
@ -72,6 +72,26 @@ func ParseFilterParams(r *http.Request) (*model.LogsFilterParams, error) {
return &res, nil
}
func ParseLiveTailFilterParams(r *http.Request) (*model.LogsFilterParams, error) {
res := model.LogsFilterParams{}
params := r.URL.Query()
if val, ok := params["q"]; ok {
res.Query = &val[0]
}
if val, ok := params["timestampStart"]; ok {
ts, err := strconv.Atoi(val[0])
if err != nil {
return nil, err
}
ts64 := uint64(ts)
res.TimestampStart = &ts64
}
if val, ok := params["idStart"]; ok {
res.IdStart = &val[0]
}
return &res, nil
}
func parseLogQuery(query string) ([]string, error) {
sqlQueryTokens := []string{}
filterTokens := tokenRegex.FindAllString(query, -1)
@ -158,9 +178,13 @@ func replaceInterestingFields(allFields *model.GetFieldsResponse, queryTokens []
}
func GenerateSQLWhere(allFields *model.GetFieldsResponse, params *model.LogsFilterParams) (*string, error) {
tokens, err := parseLogQuery(*params.Query)
if err != nil {
return nil, err
var tokens []string
var err error
if params.Query != nil {
tokens, err = parseLogQuery(*params.Query)
if err != nil {
return nil, err
}
}
tokens, err = replaceInterestingFields(allFields, tokens)

View File

@ -62,7 +62,7 @@ type Reader interface {
GetLogFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError)
UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError
GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.GetLogsResponse, *model.ApiError)
TailLogs(ctx context.Context, client *model.LogsTailClient) *model.ApiError
TailLogs(ctx context.Context, client *model.LogsTailClient)
// Connection needed for rules, not ideal but required
GetConn() clickhouse.Conn

View File

@ -335,8 +335,8 @@ type LogsFilterParams struct {
OrderBy string `json:"orderBy"`
Order string `json:"order"`
Query *string `json:"q"`
TimestampStart *int64 `json:"timestampStart"`
TimestampEnd *int64 `json:"timestampEnd"`
TimestampStart *uint64 `json:"timestampStart"`
TimestampEnd *uint64 `json:"timestampEnd"`
IdStart *string `json:"idStart"`
IdEnd *string `json:"idEnd"`
}

View File

@ -437,7 +437,9 @@ type GetLogsResponse struct {
}
type LogsTailClient struct {
Name string
Logs chan *string
Done chan *bool
Name string
Logs chan *GetLogsResponse
Done chan *bool
Error chan error
Filter LogsFilterParams
}