From 389058b9b4abec1e69209078240de48e3681afaf Mon Sep 17 00:00:00 2001 From: Nityananda Gohain Date: Wed, 10 Apr 2024 17:25:57 +0530 Subject: [PATCH] feat: allow query restrictions for log queries (#4778) * feat: allow query restrictions for log queries * fix: error check * fix: set default only if not present * chore: add error log for query restriction error * fix: add limtations for traces * fix: fix wrapper --------- Co-authored-by: Srikanth Chekuri --- .../app/clickhouseReader/reader.go | 28 ++++++++- .../app/clickhouseReader/wrapper.go | 63 ++++++++++++++----- pkg/query-service/app/logs/v3/enrich_query.go | 1 + 3 files changed, 75 insertions(+), 17 deletions(-) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 781db7fbae..bae5354244 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -163,7 +163,14 @@ func NewReaderFromClickhouseConnection( os.Exit(1) } - wrap := clickhouseConnWrapper{conn: db} + wrap := clickhouseConnWrapper{ + conn: db, + settings: ClickhouseQuerySettings{ + MaxExecutionTimeLeaf: os.Getenv("ClickHouseMaxExecutionTimeLeaf"), + TimeoutBeforeCheckingExecutionSpeed: os.Getenv("ClickHouseTimeoutBeforeCheckingExecutionSpeed"), + MaxBytesToRead: os.Getenv("ClickHouseMaxBytesToRead"), + }, + } return &ClickHouseReader{ db: wrap, @@ -4742,7 +4749,7 @@ func readRowsForTimeSeriesResult(rows driver.Rows, vars []interface{}, columnNam series := v3.Series{Labels: seriesToAttrs[key], Points: points, GroupingSetsPoint: groupingSetsPoint, LabelsArray: labelsArray[key]} seriesList = append(seriesList, &series) } - return seriesList, nil + return seriesList, getPersonalisedError(rows.Err()) } func logComment(ctx context.Context) string { @@ -4833,10 +4840,25 @@ func (r *ClickHouseReader) GetListResultV3(ctx context.Context, query string) ([ rowList = append(rowList, &v3.Row{Timestamp: t, Data: row}) } - return rowList, nil + return rowList, getPersonalisedError(rows.Err()) } +func getPersonalisedError(err error) error { + if err == nil { + return nil + } + zap.L().Error("error while reading result", zap.Error(err)) + if strings.Contains(err.Error(), "code: 307") { + return errors.New("query is consuming too much resources, please reach out to the team") + } + + if strings.Contains(err.Error(), "code: 159") { + return errors.New("Query is taking too long to run, please reach out to the team") + } + return err +} + func removeDuplicateUnderscoreAttributes(row map[string]interface{}) { if val, ok := row["attributes_int64"]; ok { attributes := val.(*map[string]int64) diff --git a/pkg/query-service/app/clickhouseReader/wrapper.go b/pkg/query-service/app/clickhouseReader/wrapper.go index 2691d961eb..eac6721b4a 100644 --- a/pkg/query-service/app/clickhouseReader/wrapper.go +++ b/pkg/query-service/app/clickhouseReader/wrapper.go @@ -3,13 +3,21 @@ package clickhouseReader import ( "context" "encoding/json" + "strings" "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" ) +type ClickhouseQuerySettings struct { + MaxExecutionTimeLeaf string + TimeoutBeforeCheckingExecutionSpeed string + MaxBytesToRead string +} + type clickhouseConnWrapper struct { - conn clickhouse.Conn + conn clickhouse.Conn + settings ClickhouseQuerySettings } func (c clickhouseConnWrapper) Close() error { @@ -24,48 +32,75 @@ func (c clickhouseConnWrapper) Stats() driver.Stats { return c.conn.Stats() } -func (c clickhouseConnWrapper) logComment(ctx context.Context) context.Context { +func (c clickhouseConnWrapper) addClickHouseSettings(ctx context.Context, query string) context.Context { + settings := clickhouse.Settings{} + + logComment := c.getLogComment(ctx) + if logComment != "" { + settings["log_comment"] = logComment + } + + // don't add resource restrictions traces + if strings.Contains(query, "signoz_traces") { + ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings)) + return ctx + } + + if c.settings.MaxBytesToRead != "" { + settings["max_bytes_to_read"] = c.settings.MaxBytesToRead + } + + if c.settings.MaxExecutionTimeLeaf != "" { + settings["max_execution_time_leaf"] = c.settings.MaxExecutionTimeLeaf + } + + if c.settings.TimeoutBeforeCheckingExecutionSpeed != "" { + settings["timeout_before_checking_execution_speed"] = c.settings.TimeoutBeforeCheckingExecutionSpeed + } + + ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings)) + return ctx +} + +func (c clickhouseConnWrapper) getLogComment(ctx context.Context) string { // Get the key-value pairs from context for log comment kv := ctx.Value("log_comment") if kv == nil { - return ctx + return "" } logCommentKVs, ok := kv.(map[string]string) if !ok { - return ctx + return "" } logComment, _ := json.Marshal(logCommentKVs) - ctx = clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{ - "log_comment": logComment, - })) - return ctx + return string(logComment) } func (c clickhouseConnWrapper) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) { - return c.conn.Query(c.logComment(ctx), query, args...) + return c.conn.Query(c.addClickHouseSettings(ctx, query), query, args...) } func (c clickhouseConnWrapper) QueryRow(ctx context.Context, query string, args ...interface{}) driver.Row { - return c.conn.QueryRow(c.logComment(ctx), query, args...) + return c.conn.QueryRow(c.addClickHouseSettings(ctx, query), query, args...) } func (c clickhouseConnWrapper) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error { - return c.conn.Select(c.logComment(ctx), dest, query, args...) + return c.conn.Select(c.addClickHouseSettings(ctx, query), dest, query, args...) } func (c clickhouseConnWrapper) Exec(ctx context.Context, query string, args ...interface{}) error { - return c.conn.Exec(c.logComment(ctx), query, args...) + return c.conn.Exec(c.addClickHouseSettings(ctx, query), query, args...) } func (c clickhouseConnWrapper) AsyncInsert(ctx context.Context, query string, wait bool, args ...interface{}) error { - return c.conn.AsyncInsert(c.logComment(ctx), query, wait, args...) + return c.conn.AsyncInsert(c.addClickHouseSettings(ctx, query), query, wait, args...) } func (c clickhouseConnWrapper) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) { - return c.conn.PrepareBatch(c.logComment(ctx), query, opts...) + return c.conn.PrepareBatch(c.addClickHouseSettings(ctx, query), query, opts...) } func (c clickhouseConnWrapper) ServerVersion() (*driver.ServerVersion, error) { diff --git a/pkg/query-service/app/logs/v3/enrich_query.go b/pkg/query-service/app/logs/v3/enrich_query.go index 06445d164d..465919360f 100644 --- a/pkg/query-service/app/logs/v3/enrich_query.go +++ b/pkg/query-service/app/logs/v3/enrich_query.go @@ -153,6 +153,7 @@ func enrichFieldWithMetadata(field v3.AttributeKey, fields map[string]v3.Attribu return field } + // enrich with default values if metadata is not found if field.Type == "" { field.Type = v3.AttributeKeyTypeTag }