mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-14 02:35:56 +08:00
feat: allow query restrictions for log queries (#4778)
* feat: allow query restrictions for log queries * fix: error check * fix: set default only if not present * chore: add error log for query restriction error * fix: add limtations for traces * fix: fix wrapper --------- Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
This commit is contained in:
parent
27e412d1ee
commit
389058b9b4
@ -163,7 +163,14 @@ func NewReaderFromClickhouseConnection(
|
|||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
wrap := clickhouseConnWrapper{conn: db}
|
wrap := clickhouseConnWrapper{
|
||||||
|
conn: db,
|
||||||
|
settings: ClickhouseQuerySettings{
|
||||||
|
MaxExecutionTimeLeaf: os.Getenv("ClickHouseMaxExecutionTimeLeaf"),
|
||||||
|
TimeoutBeforeCheckingExecutionSpeed: os.Getenv("ClickHouseTimeoutBeforeCheckingExecutionSpeed"),
|
||||||
|
MaxBytesToRead: os.Getenv("ClickHouseMaxBytesToRead"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
return &ClickHouseReader{
|
return &ClickHouseReader{
|
||||||
db: wrap,
|
db: wrap,
|
||||||
@ -4742,7 +4749,7 @@ func readRowsForTimeSeriesResult(rows driver.Rows, vars []interface{}, columnNam
|
|||||||
series := v3.Series{Labels: seriesToAttrs[key], Points: points, GroupingSetsPoint: groupingSetsPoint, LabelsArray: labelsArray[key]}
|
series := v3.Series{Labels: seriesToAttrs[key], Points: points, GroupingSetsPoint: groupingSetsPoint, LabelsArray: labelsArray[key]}
|
||||||
seriesList = append(seriesList, &series)
|
seriesList = append(seriesList, &series)
|
||||||
}
|
}
|
||||||
return seriesList, nil
|
return seriesList, getPersonalisedError(rows.Err())
|
||||||
}
|
}
|
||||||
|
|
||||||
func logComment(ctx context.Context) string {
|
func logComment(ctx context.Context) string {
|
||||||
@ -4833,10 +4840,25 @@ func (r *ClickHouseReader) GetListResultV3(ctx context.Context, query string) ([
|
|||||||
rowList = append(rowList, &v3.Row{Timestamp: t, Data: row})
|
rowList = append(rowList, &v3.Row{Timestamp: t, Data: row})
|
||||||
}
|
}
|
||||||
|
|
||||||
return rowList, nil
|
return rowList, getPersonalisedError(rows.Err())
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getPersonalisedError(err error) error {
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
zap.L().Error("error while reading result", zap.Error(err))
|
||||||
|
if strings.Contains(err.Error(), "code: 307") {
|
||||||
|
return errors.New("query is consuming too much resources, please reach out to the team")
|
||||||
|
}
|
||||||
|
|
||||||
|
if strings.Contains(err.Error(), "code: 159") {
|
||||||
|
return errors.New("Query is taking too long to run, please reach out to the team")
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func removeDuplicateUnderscoreAttributes(row map[string]interface{}) {
|
func removeDuplicateUnderscoreAttributes(row map[string]interface{}) {
|
||||||
if val, ok := row["attributes_int64"]; ok {
|
if val, ok := row["attributes_int64"]; ok {
|
||||||
attributes := val.(*map[string]int64)
|
attributes := val.(*map[string]int64)
|
||||||
|
@ -3,13 +3,21 @@ package clickhouseReader
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/ClickHouse/clickhouse-go/v2"
|
"github.com/ClickHouse/clickhouse-go/v2"
|
||||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type ClickhouseQuerySettings struct {
|
||||||
|
MaxExecutionTimeLeaf string
|
||||||
|
TimeoutBeforeCheckingExecutionSpeed string
|
||||||
|
MaxBytesToRead string
|
||||||
|
}
|
||||||
|
|
||||||
type clickhouseConnWrapper struct {
|
type clickhouseConnWrapper struct {
|
||||||
conn clickhouse.Conn
|
conn clickhouse.Conn
|
||||||
|
settings ClickhouseQuerySettings
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c clickhouseConnWrapper) Close() error {
|
func (c clickhouseConnWrapper) Close() error {
|
||||||
@ -24,48 +32,75 @@ func (c clickhouseConnWrapper) Stats() driver.Stats {
|
|||||||
return c.conn.Stats()
|
return c.conn.Stats()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c clickhouseConnWrapper) logComment(ctx context.Context) context.Context {
|
func (c clickhouseConnWrapper) addClickHouseSettings(ctx context.Context, query string) context.Context {
|
||||||
|
settings := clickhouse.Settings{}
|
||||||
|
|
||||||
|
logComment := c.getLogComment(ctx)
|
||||||
|
if logComment != "" {
|
||||||
|
settings["log_comment"] = logComment
|
||||||
|
}
|
||||||
|
|
||||||
|
// don't add resource restrictions traces
|
||||||
|
if strings.Contains(query, "signoz_traces") {
|
||||||
|
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings))
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.settings.MaxBytesToRead != "" {
|
||||||
|
settings["max_bytes_to_read"] = c.settings.MaxBytesToRead
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.settings.MaxExecutionTimeLeaf != "" {
|
||||||
|
settings["max_execution_time_leaf"] = c.settings.MaxExecutionTimeLeaf
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.settings.TimeoutBeforeCheckingExecutionSpeed != "" {
|
||||||
|
settings["timeout_before_checking_execution_speed"] = c.settings.TimeoutBeforeCheckingExecutionSpeed
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings))
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c clickhouseConnWrapper) getLogComment(ctx context.Context) string {
|
||||||
// Get the key-value pairs from context for log comment
|
// Get the key-value pairs from context for log comment
|
||||||
kv := ctx.Value("log_comment")
|
kv := ctx.Value("log_comment")
|
||||||
if kv == nil {
|
if kv == nil {
|
||||||
return ctx
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
logCommentKVs, ok := kv.(map[string]string)
|
logCommentKVs, ok := kv.(map[string]string)
|
||||||
if !ok {
|
if !ok {
|
||||||
return ctx
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
logComment, _ := json.Marshal(logCommentKVs)
|
logComment, _ := json.Marshal(logCommentKVs)
|
||||||
|
|
||||||
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{
|
return string(logComment)
|
||||||
"log_comment": logComment,
|
|
||||||
}))
|
|
||||||
return ctx
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c clickhouseConnWrapper) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) {
|
func (c clickhouseConnWrapper) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) {
|
||||||
return c.conn.Query(c.logComment(ctx), query, args...)
|
return c.conn.Query(c.addClickHouseSettings(ctx, query), query, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c clickhouseConnWrapper) QueryRow(ctx context.Context, query string, args ...interface{}) driver.Row {
|
func (c clickhouseConnWrapper) QueryRow(ctx context.Context, query string, args ...interface{}) driver.Row {
|
||||||
return c.conn.QueryRow(c.logComment(ctx), query, args...)
|
return c.conn.QueryRow(c.addClickHouseSettings(ctx, query), query, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c clickhouseConnWrapper) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
|
func (c clickhouseConnWrapper) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
|
||||||
return c.conn.Select(c.logComment(ctx), dest, query, args...)
|
return c.conn.Select(c.addClickHouseSettings(ctx, query), dest, query, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c clickhouseConnWrapper) Exec(ctx context.Context, query string, args ...interface{}) error {
|
func (c clickhouseConnWrapper) Exec(ctx context.Context, query string, args ...interface{}) error {
|
||||||
return c.conn.Exec(c.logComment(ctx), query, args...)
|
return c.conn.Exec(c.addClickHouseSettings(ctx, query), query, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c clickhouseConnWrapper) AsyncInsert(ctx context.Context, query string, wait bool, args ...interface{}) error {
|
func (c clickhouseConnWrapper) AsyncInsert(ctx context.Context, query string, wait bool, args ...interface{}) error {
|
||||||
return c.conn.AsyncInsert(c.logComment(ctx), query, wait, args...)
|
return c.conn.AsyncInsert(c.addClickHouseSettings(ctx, query), query, wait, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c clickhouseConnWrapper) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) {
|
func (c clickhouseConnWrapper) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) {
|
||||||
return c.conn.PrepareBatch(c.logComment(ctx), query, opts...)
|
return c.conn.PrepareBatch(c.addClickHouseSettings(ctx, query), query, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c clickhouseConnWrapper) ServerVersion() (*driver.ServerVersion, error) {
|
func (c clickhouseConnWrapper) ServerVersion() (*driver.ServerVersion, error) {
|
||||||
|
@ -153,6 +153,7 @@ func enrichFieldWithMetadata(field v3.AttributeKey, fields map[string]v3.Attribu
|
|||||||
return field
|
return field
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// enrich with default values if metadata is not found
|
||||||
if field.Type == "" {
|
if field.Type == "" {
|
||||||
field.Type = v3.AttributeKeyTypeTag
|
field.Type = v3.AttributeKeyTypeTag
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user