diff --git a/pkg/query-service/app/clickhouseReader/options.go b/pkg/query-service/app/clickhouseReader/options.go index 35e5e1732c..1f45e47115 100644 --- a/pkg/query-service/app/clickhouseReader/options.go +++ b/pkg/query-service/app/clickhouseReader/options.go @@ -36,7 +36,7 @@ const ( defaultLogAttributeKeysTable string = "distributed_logs_attribute_keys" defaultLogResourceKeysTable string = "distributed_logs_resource_keys" defaultLogTagAttributeTable string = "distributed_tag_attributes" - defaultLiveTailRefreshSeconds int = 10 + defaultLiveTailRefreshSeconds int = 5 defaultWriteBatchDelay time.Duration = 5 * time.Second defaultWriteBatchSize int = 10000 defaultEncoding Encoding = EncodingJSON diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 2bf15b90cd..ac763be193 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -4504,3 +4504,46 @@ func (r *ClickHouseReader) GetSpanAttributeKeys(ctx context.Context) (map[string } return response, nil } + +func (r *ClickHouseReader) LiveTailLogsV3(ctx context.Context, query string, timestampStart uint64, idStart string, client *v3.LogsLiveTailClient) { + if timestampStart == 0 { + timestampStart = uint64(time.Now().UnixNano()) + } + + ticker := time.NewTicker(time.Duration(r.liveTailRefreshSeconds) * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + done := true + client.Done <- &done + zap.S().Debug("closing go routine : " + client.Name) + return + case <-ticker.C: + // get the new 100 logs as anything more older won't make sense + tmpQuery := fmt.Sprintf("timestamp >='%d'", timestampStart) + if idStart != "" { + tmpQuery = fmt.Sprintf("%s AND id > '%s'", tmpQuery, idStart) + } + tmpQuery = fmt.Sprintf(query, tmpQuery) + // the reason we are doing desc is that we need the latest logs first + tmpQuery = fmt.Sprintf("%s order by timestamp desc, id desc limit 100", tmpQuery) + + // using the old structure since we can directly read it to the struct as use it. + response := []model.GetLogsResponse{} + err := r.db.Select(ctx, &response, tmpQuery) + if err != nil { + zap.S().Error(err) + client.Error <- err + return + } + for i := len(response) - 1; i >= 0; i-- { + client.Logs <- &response[i] + if i == 0 { + timestampStart = response[i].Timestamp + idStart = response[i].ID + } + } + } + } +} diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index c32ae0d426..e53e9af91a 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -272,6 +272,9 @@ func (aH *APIHandler) RegisterQueryRangeV3Routes(router *mux.Router, am *AuthMid subRouter.HandleFunc("/autocomplete/attribute_values", am.ViewAccess( withCacheControl(AutoCompleteCacheControlAge, aH.autoCompleteAttributeValues))).Methods(http.MethodGet) subRouter.HandleFunc("/query_range", am.ViewAccess(aH.QueryRangeV3)).Methods(http.MethodPost) + + // live logs + subRouter.HandleFunc("/logs/livetail", am.ViewAccess(aH.liveTailLogs)).Methods(http.MethodPost) } func (aH *APIHandler) Respond(w http.ResponseWriter, data interface{}) { @@ -2903,3 +2906,79 @@ func applyMetricLimit(results []*v3.Result, queryRangeParams *v3.QueryRangeParam } } } + +func (aH *APIHandler) liveTailLogs(w http.ResponseWriter, r *http.Request) { + + queryRangeParams, apiErrorObj := ParseQueryRangeParams(r) + if apiErrorObj != nil { + zap.S().Errorf(apiErrorObj.Err.Error()) + RespondError(w, apiErrorObj, nil) + return + } + + var err error + var queryString string + switch queryRangeParams.CompositeQuery.QueryType { + case v3.QueryTypeBuilder: + // check if any enrichment is required for logs if yes then enrich them + if logsv3.EnrichmentRequired(queryRangeParams) { + // get the fields if any logs query is present + var fields map[string]v3.AttributeKey + fields, err = aH.getLogFieldsV3(r.Context(), queryRangeParams) + if err != nil { + apiErrObj := &model.ApiError{Typ: model.ErrorInternal, Err: err} + RespondError(w, apiErrObj, nil) + return + } + logsv3.Enrich(queryRangeParams, fields) + } + + queryString, err = aH.queryBuilder.PrepareLiveTailQuery(queryRangeParams) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) + return + } + + default: + err = fmt.Errorf("invalid query type") + RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil) + return + } + + // create the client + client := &v3.LogsLiveTailClient{Name: r.RemoteAddr, Logs: make(chan *model.GetLogsResponse, 1000), Done: make(chan *bool), Error: make(chan error)} + go aH.reader.LiveTailLogsV3(r.Context(), queryString, uint64(queryRangeParams.Start), "", client) + + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Access-Control-Allow-Origin", "*") + w.WriteHeader(200) + + flusher, ok := w.(http.Flusher) + if !ok { + err := model.ApiError{Typ: model.ErrorStreamingNotSupported, Err: nil} + RespondError(w, &err, "streaming is not supported") + return + } + // flush the headers + flusher.Flush() + for { + select { + case log := <-client.Logs: + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + enc.Encode(log) + fmt.Fprintf(w, "event: log\ndata: %v\n\n", buf.String()) + flusher.Flush() + case <-client.Done: + zap.S().Debug("done!") + return + case err := <-client.Error: + zap.S().Error("error occured!", err) + fmt.Fprintf(w, "event: error\ndata: %v\n\n", err.Error()) + flusher.Flush() + return + } + } +} diff --git a/pkg/query-service/app/logs/v3/query_builder.go b/pkg/query-service/app/logs/v3/query_builder.go index 79c92e3810..b592f684e9 100644 --- a/pkg/query-service/app/logs/v3/query_builder.go +++ b/pkg/query-service/app/logs/v3/query_builder.go @@ -289,6 +289,27 @@ func buildLogsQuery(panelType v3.PanelType, start, end, step int64, mq *v3.Build } } +func buildLogsLiveTailQuery(mq *v3.BuilderQuery) (string, error) { + filterSubQuery, err := buildLogsTimeSeriesFilterQuery(mq.Filters, mq.GroupBy) + if err != nil { + return "", err + } + + switch mq.AggregateOperator { + case v3.AggregateOperatorNoOp: + queryTmpl := constants.LogsSQLSelect + "from signoz_logs.distributed_logs where %s" + if len(filterSubQuery) == 0 { + filterSubQuery = "%s" + } else { + filterSubQuery = "%s " + filterSubQuery + } + query := fmt.Sprintf(queryTmpl, filterSubQuery) + return query, nil + default: + return "", fmt.Errorf("unsupported aggregate operator in live tail") + } +} + // groupBy returns a string of comma separated tags for group by clause // `ts` is always added to the group by clause func groupBy(panelType v3.PanelType, graphLimitQtype string, tags ...string) string { @@ -384,26 +405,36 @@ func addOffsetToQuery(query string, offset uint64) string { return fmt.Sprintf("%s OFFSET %d", query, offset) } -func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery, graphLimitQtype string) (string, error) { +type Options struct { + GraphLimitQtype string + IsLivetailQuery bool +} - if graphLimitQtype == constants.FirstQueryGraphLimit { +func PrepareLogsQuery(start, end int64, queryType v3.QueryType, panelType v3.PanelType, mq *v3.BuilderQuery, options Options) (string, error) { + if options.IsLivetailQuery { + query, err := buildLogsLiveTailQuery(mq) + if err != nil { + return "", err + } + return query, nil + } else if options.GraphLimitQtype == constants.FirstQueryGraphLimit { // give me just the groupby names - query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, graphLimitQtype) + query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype) if err != nil { return "", err } query = addLimitToQuery(query, mq.Limit) return query, nil - } else if graphLimitQtype == constants.SecondQueryGraphLimit { - query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, graphLimitQtype) + } else if options.GraphLimitQtype == constants.SecondQueryGraphLimit { + query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype) if err != nil { return "", err } return query, nil } - query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, graphLimitQtype) + query, err := buildLogsQuery(panelType, start, end, mq.StepInterval, mq, options.GraphLimitQtype) if err != nil { return "", err } diff --git a/pkg/query-service/app/logs/v3/query_builder_test.go b/pkg/query-service/app/logs/v3/query_builder_test.go index 0253a0b21f..6c47c5421d 100644 --- a/pkg/query-service/app/logs/v3/query_builder_test.go +++ b/pkg/query-service/app/logs/v3/query_builder_test.go @@ -989,7 +989,7 @@ var testPrepLogsQueryData = []struct { TableName string AggregateOperator v3.AggregateOperator ExpectedQuery string - Type string + Options Options }{ { Name: "Test TS with limit- first", @@ -1011,7 +1011,7 @@ var testPrepLogsQueryData = []struct { }, TableName: "logs", ExpectedQuery: "SELECT method from (SELECT attributes_string_value[indexOf(attributes_string_key, 'method')] as method, toFloat64(count(distinct(attributes_string_value[indexOf(attributes_string_key, 'name')]))) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET' AND indexOf(attributes_string_key, 'method') > 0 group by method order by value DESC) LIMIT 10", - Type: constants.FirstQueryGraphLimit, + Options: Options{GraphLimitQtype: constants.FirstQueryGraphLimit}, }, { Name: "Test TS with limit- first - with order by value", @@ -1034,7 +1034,7 @@ var testPrepLogsQueryData = []struct { }, TableName: "logs", ExpectedQuery: "SELECT method from (SELECT attributes_string_value[indexOf(attributes_string_key, 'method')] as method, toFloat64(count(distinct(attributes_string_value[indexOf(attributes_string_key, 'name')]))) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET' AND indexOf(attributes_string_key, 'method') > 0 group by method order by value ASC) LIMIT 10", - Type: constants.FirstQueryGraphLimit, + Options: Options{GraphLimitQtype: constants.FirstQueryGraphLimit}, }, { Name: "Test TS with limit- first - with order by attribute", @@ -1057,7 +1057,7 @@ var testPrepLogsQueryData = []struct { }, TableName: "logs", ExpectedQuery: "SELECT method from (SELECT attributes_string_value[indexOf(attributes_string_key, 'method')] as method, toFloat64(count(distinct(attributes_string_value[indexOf(attributes_string_key, 'name')]))) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET' AND indexOf(attributes_string_key, 'method') > 0 group by method order by method ASC) LIMIT 10", - Type: constants.FirstQueryGraphLimit, + Options: Options{GraphLimitQtype: constants.FirstQueryGraphLimit}, }, { Name: "Test TS with limit- second", @@ -1079,7 +1079,7 @@ var testPrepLogsQueryData = []struct { }, TableName: "logs", ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 0 SECOND) AS ts, attributes_string_value[indexOf(attributes_string_key, 'method')] as method, toFloat64(count(distinct(attributes_string_value[indexOf(attributes_string_key, 'name')]))) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET' AND indexOf(attributes_string_key, 'method') > 0 AND (method) IN (%s) group by method,ts order by value DESC", - Type: constants.SecondQueryGraphLimit, + Options: Options{GraphLimitQtype: constants.SecondQueryGraphLimit}, }, { Name: "Test TS with limit- second - with order by", @@ -1102,14 +1102,50 @@ var testPrepLogsQueryData = []struct { }, TableName: "logs", ExpectedQuery: "SELECT toStartOfInterval(fromUnixTimestamp64Nano(timestamp), INTERVAL 0 SECOND) AS ts, attributes_string_value[indexOf(attributes_string_key, 'method')] as method, toFloat64(count(distinct(attributes_string_value[indexOf(attributes_string_key, 'name')]))) as value from signoz_logs.distributed_logs where (timestamp >= 1680066360726210000 AND timestamp <= 1680066458000000000) AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET' AND indexOf(attributes_string_key, 'method') > 0 AND (method) IN (%s) group by method,ts order by method ASC", - Type: constants.SecondQueryGraphLimit, + Options: Options{GraphLimitQtype: constants.SecondQueryGraphLimit}, + }, + // Live tail + { + Name: "Live Tail Query", + PanelType: v3.PanelTypeList, + Start: 1680066360726210000, + End: 1680066458000000000, + Step: 60, + BuilderQuery: &v3.BuilderQuery{ + QueryName: "A", + AggregateOperator: v3.AggregateOperatorNoOp, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "method", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag}, Value: "GET", Operator: "="}, + }, + }, + }, + TableName: "logs", + ExpectedQuery: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body,CAST((attributes_string_key, attributes_string_value), 'Map(String, String)') as attributes_string,CAST((attributes_int64_key, attributes_int64_value), 'Map(String, Int64)') as attributes_int64,CAST((attributes_float64_key, attributes_float64_value), 'Map(String, Float64)') as attributes_float64,CAST((resources_string_key, resources_string_value), 'Map(String, String)') as resources_string from signoz_logs.distributed_logs where %s AND attributes_string_value[indexOf(attributes_string_key, 'method')] = 'GET'", + Options: Options{IsLivetailQuery: true}, + }, + { + Name: "Live Tail Query W/O filter", + PanelType: v3.PanelTypeList, + Start: 1680066360726210000, + End: 1680066458000000000, + Step: 60, + BuilderQuery: &v3.BuilderQuery{ + QueryName: "A", + AggregateOperator: v3.AggregateOperatorNoOp, + Expression: "A", + Filters: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}}, + }, + TableName: "logs", + ExpectedQuery: "SELECT timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body,CAST((attributes_string_key, attributes_string_value), 'Map(String, String)') as attributes_string,CAST((attributes_int64_key, attributes_int64_value), 'Map(String, Int64)') as attributes_int64,CAST((attributes_float64_key, attributes_float64_value), 'Map(String, Float64)') as attributes_float64,CAST((resources_string_key, resources_string_value), 'Map(String, String)') as resources_string from signoz_logs.distributed_logs where %s", + Options: Options{IsLivetailQuery: true}, }, } func TestPrepareLogsQuery(t *testing.T) { for _, tt := range testPrepLogsQueryData { Convey("TestBuildLogsQuery", t, func() { - query, err := PrepareLogsQuery(tt.Start, tt.End, "", tt.PanelType, tt.BuilderQuery, tt.Type) + query, err := PrepareLogsQuery(tt.Start, tt.End, "", tt.PanelType, tt.BuilderQuery, tt.Options) So(err, ShouldBeNil) So(query, ShouldEqual, tt.ExpectedQuery) diff --git a/pkg/query-service/app/querier/querier.go b/pkg/query-service/app/querier/querier.go index 7284bddc9f..ea77fca4f3 100644 --- a/pkg/query-service/app/querier/querier.go +++ b/pkg/query-service/app/querier/querier.go @@ -235,7 +235,7 @@ func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangePa // TODO: add support for logs and traces if builderQuery.DataSource == v3.DataSourceLogs { - query, err := logsV3.PrepareLogsQuery(params.Start, params.End, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, "") + query, err := logsV3.PrepareLogsQuery(params.Start, params.End, params.CompositeQuery.QueryType, params.CompositeQuery.PanelType, builderQuery, logsV3.Options{}) if err != nil { errQueriesByName[queryName] = err.Error() continue diff --git a/pkg/query-service/app/queryBuilder/query_builder.go b/pkg/query-service/app/queryBuilder/query_builder.go index c3f8b0d4d0..fb6c8211bc 100644 --- a/pkg/query-service/app/queryBuilder/query_builder.go +++ b/pkg/query-service/app/queryBuilder/query_builder.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/SigNoz/govaluate" + logsV3 "go.signoz.io/signoz/pkg/query-service/app/logs/v3" "go.signoz.io/signoz/pkg/query-service/cache" "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" @@ -40,7 +41,7 @@ var SupportedFunctions = []string{ var EvalFuncs = map[string]govaluate.ExpressionFunction{} type prepareTracesQueryFunc func(start, end int64, panelType v3.PanelType, bq *v3.BuilderQuery, keys map[string]v3.AttributeKey, graphLimitQtype string) (string, error) -type prepareLogsQueryFunc func(start, end int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery, graphLimitQtype string) (string, error) +type prepareLogsQueryFunc func(start, end int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery, options logsV3.Options) (string, error) type prepareMetricQueryFunc func(start, end int64, queryType v3.QueryType, panelType v3.PanelType, bq *v3.BuilderQuery) (string, error) type QueryBuilder struct { @@ -131,6 +132,29 @@ func expressionToQuery(qp *v3.QueryRangeParamsV3, varToQuery map[string]string, return formulaQuery, nil } +func (qb *QueryBuilder) PrepareLiveTailQuery(params *v3.QueryRangeParamsV3) (string, error) { + var queryStr string + var err error + compositeQuery := params.CompositeQuery + + if compositeQuery != nil { + // There can only be a signle query and there is no concept of disabling queries + if len(compositeQuery.BuilderQueries) != 1 { + return "", fmt.Errorf("live tail is only supported for single query") + } + for queryName, query := range compositeQuery.BuilderQueries { + if query.Expression == queryName { + queryStr, err = qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{IsLivetailQuery: true}) + if err != nil { + return "", err + } + } + } + + } + return queryStr, nil +} + func (qb *QueryBuilder) PrepareQueries(params *v3.QueryRangeParamsV3, args ...interface{}) (map[string]string, error) { queries := make(map[string]string) @@ -169,18 +193,18 @@ func (qb *QueryBuilder) PrepareQueries(params *v3.QueryRangeParamsV3, args ...in case v3.DataSourceLogs: // for ts query with limit replace it as it is already formed if compositeQuery.PanelType == v3.PanelTypeGraph && query.Limit > 0 && len(query.GroupBy) > 0 { - limitQuery, err := qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, constants.FirstQueryGraphLimit) + limitQuery, err := qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{GraphLimitQtype: constants.FirstQueryGraphLimit}) if err != nil { return nil, err } - placeholderQuery, err := qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, constants.SecondQueryGraphLimit) + placeholderQuery, err := qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{GraphLimitQtype: constants.SecondQueryGraphLimit}) if err != nil { return nil, err } query := fmt.Sprintf(placeholderQuery, limitQuery) queries[queryName] = query } else { - queryString, err := qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, "") + queryString, err := qb.options.BuildLogQuery(params.Start, params.End, compositeQuery.QueryType, compositeQuery.PanelType, query, logsV3.Options{}) if err != nil { return nil, err } diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 638de6e4d8..f53a3a1fe6 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -69,6 +69,7 @@ type Reader interface { // QB V3 metrics/traces/logs GetTimeSeriesResultV3(ctx context.Context, query string) ([]*v3.Series, error) GetListResultV3(ctx context.Context, query string) ([]*v3.Row, error) + LiveTailLogsV3(ctx context.Context, query string, timestampStart uint64, idStart string, client *v3.LogsLiveTailClient) GetTotalSpans(ctx context.Context) (uint64, error) GetSpansInLastHeartBeatInterval(ctx context.Context) (uint64, error) diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index 7647e5a75d..d1dab19c0d 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -8,6 +8,7 @@ import ( "time" "github.com/google/uuid" + "go.signoz.io/signoz/pkg/query-service/model" ) type DataSource string @@ -584,6 +585,13 @@ type Result struct { List []*Row `json:"list"` } +type LogsLiveTailClient struct { + Name string + Logs chan *model.GetLogsResponse + Done chan *bool + Error chan error +} + type Series struct { Labels map[string]string `json:"labels"` Points []Point `json:"values"`