feat: live tail API with query range support (#3170)

* feat: live tail API with query range support

* fix: minor fixes

* feat: minor fixes

* feat: send error event back to client

---------

Co-authored-by: Palash Gupta <palashgdev@gmail.com>
This commit is contained in:
Nityananda Gohain 2023-07-20 17:53:55 +05:30 committed by GitHub
parent ef0e63c35b
commit c4ce057d7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 241 additions and 19 deletions

View File

@ -36,7 +36,7 @@ const (
defaultLogAttributeKeysTable string = "distributed_logs_attribute_keys"
defaultLogResourceKeysTable string = "distributed_logs_resource_keys"
defaultLogTagAttributeTable string = "distributed_tag_attributes"
defaultLiveTailRefreshSeconds int = 10
defaultLiveTailRefreshSeconds int = 5
defaultWriteBatchDelay time.Duration = 5 * time.Second
defaultWriteBatchSize int = 10000
defaultEncoding Encoding = EncodingJSON

View File

@ -4504,3 +4504,46 @@ func (r *ClickHouseReader) GetSpanAttributeKeys(ctx context.Context) (map[string
}
return response, nil
}
func (r *ClickHouseReader) LiveTailLogsV3(ctx context.Context, query string, timestampStart uint64, idStart string, client *v3.LogsLiveTailClient) {
if timestampStart == 0 {
timestampStart = uint64(time.Now().UnixNano())
}
ticker := time.NewTicker(time.Duration(r.liveTailRefreshSeconds) * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
done := true
client.Done <- &done
zap.S().Debug("closing go routine : " + client.Name)
return
case <-ticker.C:
// get the new 100 logs as anything more older won't make sense
tmpQuery := fmt.Sprintf("timestamp >='%d'", timestampStart)
if idStart != "" {
tmpQuery = fmt.Sprintf("%s AND id > '%s'", tmpQuery, idStart)
}
tmpQuery = fmt.Sprintf(query, tmpQuery)
// the reason we are doing desc is that we need the latest logs first
tmpQuery = fmt.Sprintf("%s order by timestamp desc, id desc limit 100", tmpQuery)
// using the old structure since we can directly read it to the struct as use it.
response := []model.GetLogsResponse{}
err := r.db.Select(ctx, &response, tmpQuery)
if err != nil {
zap.S().Error(err)
client.Error <- err
return
}
for i := len(response) - 1; i >= 0; i-- {
client.Logs <- &response[i]
if i == 0 {
timestampStart = response[i].Timestamp
idStart = response[i].ID
}
}
}
}
}

View File

@ -272,6 +272,9 @@ func (aH *APIHandler) RegisterQueryRangeV3Routes(router *mux.Router, am *AuthMid
subRouter.HandleFunc("/autocomplete/attribute_values", am.ViewAccess(
withCacheControl(AutoCompleteCacheControlAge, aH.autoCompleteAttributeValues))).Methods(http.MethodGet)
subRouter.HandleFunc("/query_range", am.ViewAccess(aH.QueryRangeV3)).Methods(http.MethodPost)
// live logs
subRouter.HandleFunc("/logs/livetail", am.ViewAccess(aH.liveTailLogs)).Methods(http.MethodPost)
}
func (aH *APIHandler) Respond(w http.ResponseWriter, data interface{}) {
@ -2903,3 +2906,79 @@ func applyMetricLimit(results []*v3.Result, queryRangeParams *v3.QueryRangeParam
}
}
}
func (aH *APIHandler) liveTailLogs(w http.ResponseWriter, r *http.Request) {
queryRangeParams, apiErrorObj := ParseQueryRangeParams(r)
if apiErrorObj != nil {
zap.S().Errorf(apiErrorObj.Err.Error())
RespondError(w, apiErrorObj, nil)
return
}
var err error
var queryString string
switch queryRangeParams.CompositeQuery.QueryType {
case v3.QueryTypeBuilder:
// check if any enrichment is required for logs if yes then enrich them
if logsv3.EnrichmentRequired(queryRangeParams) {
// get the fields if any logs query is present
var fields map[string]v3.AttributeKey
fields, err = aH.getLogFieldsV3(r.Context(), queryRangeParams)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorInternal, Err: err}
RespondError(w, apiErrObj, nil)
return
}
logsv3.Enrich(queryRangeParams, fields)
}
queryString, err = aH.queryBuilder.PrepareLiveTailQuery(queryRangeParams)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
default:
err = fmt.Errorf("invalid query type")
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
// create the client
client := &v3.LogsLiveTailClient{Name: r.RemoteAddr, Logs: make(chan *model.GetLogsResponse, 1000), Done: make(chan *bool), Error: make(chan error)}
go aH.reader.LiveTailLogsV3(r.Context(), queryString, uint64(queryRangeParams.Start), "", client)
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.WriteHeader(200)
flusher, ok := w.(http.Flusher)
if !ok {
err := model.ApiError{Typ: model.ErrorStreamingNotSupported, Err: nil}
RespondError(w, &err, "streaming is not supported")
return
}
// flush the headers
flusher.Flush()
for {
select {
case log := <-client.Logs:
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
enc.Encode(log)
fmt.Fprintf(w, "event: log\ndata: %v\n\n", buf.String())
flusher.Flush()
case <-client.Done:
zap.S().Debug("done!")
return
case err := <-client.Error:
zap.S().Error("error occured!", err)
fmt.Fprintf(w, "event: error\ndata: %v\n\n", err.Error())
flusher.Flush()
return
}
}
}

View File

@ -289,6 +289,27 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build
}
}
func buildLogsLiveTailQuery(mq *v3.BuilderQuery) (string, error) {
filterSubQuery, err := buildLogsTimeSeriesFilterQuery(mq.Filters, mq.GroupBy)
if err != nil {
return "", err
}
switch mq.AggregateOperator {
case v3.AggregateOperatorNoOp:
queryTmpl := constants.LogsSQLSelect + "from signoz_logs.distributed_logs where %s"
if len(filterSubQuery) == 0 {
filterSubQuery = "%s"
} else {
filterSubQuery = "%s " + filterSubQuery
}
query := fmt.Sprintf(queryTmpl, filterSubQuery)
return query, nil
default:
return "", fmt.Errorf("unsupported aggregate operator in live tail")
}
}
// groupBy returns a string of comma separated tags for group by clause
// `ts` is always added to the group by clause
func groupBy(panelType v3.PanelType, graphLimitQtype string, tags ...string) string {
@ -384,26 +405,36 @@ func addOffsetToQuery(query string, offset uint64) string {
return fmt.Sprintf("%s OFFSET %d", query, offset)
}
func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery, graphLimitQtype string) (string, error) {
type Options struct {
GraphLimitQtype string
IsLivetailQuery bool
}
if graphLimitQtype == constants.FirstQueryGraphLimit {
func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery, options Options) (string, error) {
if options.IsLivetailQuery {
query, err := buildLogsLiveTailQuery(mq)
if err != nil {
return "", err
}
return query, nil
} else if options.GraphLimitQtype == constants.FirstQueryGraphLimit {
// give me just the groupby names
query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, graphLimitQtype)
query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype)
if err != nil {
return "", err
}
query = addLimitToQuery(query, mq.Limit)
return query, nil
} else if graphLimitQtype == constants.SecondQueryGraphLimit {
query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, graphLimitQtype)
} else if options.GraphLimitQtype == constants.SecondQueryGraphLimit {
query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype)
if err != nil {
return "", err
}
return query, nil
}
query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, graphLimitQtype)
query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype)
if err != nil {
return "", err
}

View File

@ -989,7 +989,7 @@ var testPrepLogsQueryData = []struct {
TableName string
AggregateOperator v3.AggregateOperator
ExpectedQuery string
Type string
Options Options
}{
{
Name: "Test TS with limit- first",
@ -1011,7 +1011,7 @@ var testPrepLogsQueryData = []struct {
},
TableName: "logs",
ExpectedQuery: "SELECT method from (SELECT attributes_string_value[indexOf(attributes_string_key, 'method')] as method, toFloat64(count(distinct(attributes_string_value[indexOf(attributes_string_key, 'name')]))) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET' AND indexOf(attributes_string_key, 'method') > 0 group by method order by value DESC) LIMIT 10",
Type: constants.FirstQueryGraphLimit,
Options: Options{GraphLimitQtype: constants.FirstQueryGraphLimit},
},
{
Name: "Test TS with limit- first - with order by value",
@ -1034,7 +1034,7 @@ var testPrepLogsQueryData = []struct {
},
TableName: "logs",
ExpectedQuery: "SELECT method from (SELECT attributes_string_value[indexOf(attributes_string_key, 'method')] as method, toFloat64(count(distinct(attributes_string_value[indexOf(attributes_string_key, 'name')]))) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET' AND indexOf(attributes_string_key, 'method') > 0 group by method order by value ASC) LIMIT 10",
Type: constants.FirstQueryGraphLimit,
Options: Options{GraphLimitQtype: constants.FirstQueryGraphLimit},
},
{
Name: "Test TS with limit- first - with order by attribute",
@ -1057,7 +1057,7 @@ var testPrepLogsQueryData = []struct {
},
TableName: "logs",
ExpectedQuery: "SELECT method from (SELECT attributes_string_value[indexOf(attributes_string_key, 'method')] as method, toFloat64(count(distinct(attributes_string_value[indexOf(attributes_string_key, 'name')]))) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET' AND indexOf(attributes_string_key, 'method') > 0 group by method order by method ASC) LIMIT 10",
Type: constants.FirstQueryGraphLimit,
Options: Options{GraphLimitQtype: constants.FirstQueryGraphLimit},
},
{
Name: "Test TS with limit- second",
@ -1079,7 +1079,7 @@ var testPrepLogsQueryData = []struct {
},
TableName: "logs",
ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 0 SECOND) AS ts, attributes_string_value[indexOf(attributes_string_key, 'method')] as method, toFloat64(count(distinct(attributes_string_value[indexOf(attributes_string_key, 'name')]))) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET' AND indexOf(attributes_string_key, 'method') > 0 AND (method) IN (%s) group by method,ts order by value DESC",
Type: constants.SecondQueryGraphLimit,
Options: Options{GraphLimitQtype: constants.SecondQueryGraphLimit},
},
{
Name: "Test TS with limit- second - with order by",
@ -1102,14 +1102,50 @@ var testPrepLogsQueryData = []struct {
},
TableName: "logs",
ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 0 SECOND) AS ts, attributes_string_value[indexOf(attributes_string_key, 'method')] as method, toFloat64(count(distinct(attributes_string_value[indexOf(attributes_string_key, 'name')]))) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET' AND indexOf(attributes_string_key, 'method') > 0 AND (method) IN (%s) group by method,ts order by method ASC",
Type: constants.SecondQueryGraphLimit,
Options: Options{GraphLimitQtype: constants.SecondQueryGraphLimit},
},
// Live tail
{
Name: "Live Tail Query",
PanelType: v3.PanelTypeList,
Start: 1680066360726210000,
End: 1680066458000000000,
Step: 60,
BuilderQuery: &v3.BuilderQuery{
QueryName: "A",
AggregateOperator: v3.AggregateOperatorNoOp,
Expression: "A",
Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "GET", Operator: "="},
},
},
},
TableName: "logs",
ExpectedQuery: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body,CAST((attributes_string_key, attributes_string_value), 'Map(String, String)') as attributes_string,CAST((attributes_int64_key, attributes_int64_value), 'Map(String, Int64)') as attributes_int64,CAST((attributes_float64_key, attributes_float64_value), 'Map(String, Float64)') as attributes_float64,CAST((resources_string_key, resources_string_value), 'Map(String, String)') as resources_string from signoz_logs.distributed_logs where %s AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET'",
Options: Options{IsLivetailQuery: true},
},
{
Name: "Live Tail Query W/O filter",
PanelType: v3.PanelTypeList,
Start: 1680066360726210000,
End: 1680066458000000000,
Step: 60,
BuilderQuery: &v3.BuilderQuery{
QueryName: "A",
AggregateOperator: v3.AggregateOperatorNoOp,
Expression: "A",
Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}},
},
TableName: "logs",
ExpectedQuery: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body,CAST((attributes_string_key, attributes_string_value), 'Map(String, String)') as attributes_string,CAST((attributes_int64_key, attributes_int64_value), 'Map(String, Int64)') as attributes_int64,CAST((attributes_float64_key, attributes_float64_value), 'Map(String, Float64)') as attributes_float64,CAST((resources_string_key, resources_string_value), 'Map(String, String)') as resources_string from signoz_logs.distributed_logs where %s",
Options: Options{IsLivetailQuery: true},
},
}
func TestPrepareLogsQuery(t *testing.T) {
for _, tt := range testPrepLogsQueryData {
Convey("TestBuildLogsQuery", t, func() {
query, err := PrepareLogsQuery(tt.Start, tt.End, "", tt.PanelType, tt.BuilderQuery, tt.Type)
query, err := PrepareLogsQuery(tt.Start, tt.End, "", tt.PanelType, tt.BuilderQuery, tt.Options)
So(err, ShouldBeNil)
So(query, ShouldEqual, tt.ExpectedQuery)

View File

@ -235,7 +235,7 @@ func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangePa
// TODO: add support for logs and traces
if builderQuery.DataSource == v3.DataSourceLogs {
query, err := logsV3.PrepareLogsQuery(params.Start, params.End, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, "")
query, err := logsV3.PrepareLogsQuery(params.Start, params.End, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, logsV3.Options{})
if err != nil {
errQueriesByName[queryName] = err.Error()
continue

View File

@ -5,6 +5,7 @@ import (
"strings"
"github.com/SigNoz/govaluate"
logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3"
"go.signoz.io/signoz/pkg/query-service/cache"
"go.signoz.io/signoz/pkg/query-service/constants"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
@ -40,7 +41,7 @@ var SupportedFunctions = []string{
var EvalFuncs = map[string]govaluate.ExpressionFunction{}
type prepareTracesQueryFunc func(start, end int64, panelType v3.PanelType, bq *v3.BuilderQuery, keys map[string]v3.AttributeKey, graphLimitQtype string) (string, error)
type prepareLogsQueryFunc func(start, end int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery, graphLimitQtype string) (string, error)
type prepareLogsQueryFunc func(start, end int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery, options logsV3.Options) (string, error)
type prepareMetricQueryFunc func(start, end int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery) (string, error)
type QueryBuilder struct {
@ -131,6 +132,29 @@ func expressionToQuery(qp *v3.QueryRangeParamsV3, varToQuery map[string]string,
return formulaQuery, nil
}
func (qb *QueryBuilder) PrepareLiveTailQuery(params *v3.QueryRangeParamsV3) (string, error) {
var queryStr string
var err error
compositeQuery := params.CompositeQuery
if compositeQuery != nil {
// There can only be a signle query and there is no concept of disabling queries
if len(compositeQuery.BuilderQueries) != 1 {
return "", fmt.Errorf("live tail is only supported for single query")
}
for queryName, query := range compositeQuery.BuilderQueries {
if query.Expression == queryName {
queryStr, err = qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{IsLivetailQuery: true})
if err != nil {
return "", err
}
}
}
}
return queryStr, nil
}
func (qb *QueryBuilder) PrepareQueries(params *v3.QueryRangeParamsV3, args ...interface{}) (map[string]string, error) {
queries := make(map[string]string)
@ -169,18 +193,18 @@ func (qb *QueryBuilder) PrepareQueries(params *v3.QueryRangeParamsV3, args ...in
case v3.DataSourceLogs:
// for ts query with limit replace it as it is already formed
if compositeQuery.PanelType == v3.PanelTypeGraph && query.Limit > 0 && len(query.GroupBy) > 0 {
limitQuery, err := qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, constants.FirstQueryGraphLimit)
limitQuery, err := qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit})
if err != nil {
return nil, err
}
placeholderQuery, err := qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, constants.SecondQueryGraphLimit)
placeholderQuery, err := qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit})
if err != nil {
return nil, err
}
query := fmt.Sprintf(placeholderQuery, limitQuery)
queries[queryName] = query
} else {
queryString, err := qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, "")
queryString, err := qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{})
if err != nil {
return nil, err
}

View File

@ -69,6 +69,7 @@ type Reader interface {
// QB V3 metrics/traces/logs
GetTimeSeriesResultV3(ctx context.Context, query string) ([]*v3.Series, error)
GetListResultV3(ctx context.Context, query string) ([]*v3.Row, error)
LiveTailLogsV3(ctx context.Context, query string, timestampStart uint64, idStart string, client *v3.LogsLiveTailClient)
GetTotalSpans(ctx context.Context) (uint64, error)
GetSpansInLastHeartBeatInterval(ctx context.Context) (uint64, error)

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/google/uuid"
"go.signoz.io/signoz/pkg/query-service/model"
)
type DataSource string
@ -584,6 +585,13 @@ type Result struct {
List []*Row `json:"list"`
}
type LogsLiveTailClient struct {
Name string
Logs chan *model.GetLogsResponse
Done chan *bool
Error chan error
}
type Series struct {
Labels map[string]string `json:"labels"`
Points []Point `json:"values"`