mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-12 12:18:58 +08:00
chore: pass through substitutions for CH query (#7389)
This commit is contained in:
parent
8f095dfbc9
commit
88be23c3e3
1
go.mod
1
go.mod
@ -6,6 +6,7 @@ toolchain go1.22.7
|
||||
|
||||
require (
|
||||
dario.cat/mergo v1.0.1
|
||||
github.com/AfterShip/clickhouse-sql-parser v0.4.4
|
||||
github.com/ClickHouse/clickhouse-go/v2 v2.30.0
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.2
|
||||
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd
|
||||
|
6
go.sum
6
go.sum
@ -64,6 +64,8 @@ dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
|
||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
|
||||
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
|
||||
github.com/AfterShip/clickhouse-sql-parser v0.4.4 h1:iLRwjzz1mWmUEf5UNrSYOceQ+PX9SdBJ8Xw0DNrL114=
|
||||
github.com/AfterShip/clickhouse-sql-parser v0.4.4/go.mod h1:W0Z82wJWkJxz2RVun/RMwxue3g7ut47Xxl+SFqdJGus=
|
||||
github.com/Azure/azure-sdk-for-go v68.0.0+incompatible h1:fcYLmCpyNYRnvJbPerq7U0hS+6+I79yEDJBqVNcqUzU=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0 h1:nyQWyZvwGTvunIMxi1Y9uXkcyr+I7TeNrr/foo4Kpk8=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0/go.mod h1:l38EPgmsp71HHLq9j7De57JcKOWPyhrsW1Awm1JS6K0=
|
||||
@ -827,10 +829,14 @@ github.com/scaleway/scaleway-sdk-go v1.0.0-beta.30 h1:yoKAVkEVwAqbGbR8n87rHQ1dul
|
||||
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.30/go.mod h1:sH0u6fq6x4R5M7WxkoQFY/o7UaiItec0o1LinLCJNq8=
|
||||
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
|
||||
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
|
||||
github.com/sebdah/goldie/v2 v2.5.3 h1:9ES/mNN+HNUbNWpVAlrzuZ7jE+Nrczbj8uFRjM7624Y=
|
||||
github.com/sebdah/goldie/v2 v2.5.3/go.mod h1:oZ9fp0+se1eapSRjfYbsV/0Hqhbuu3bJVvKI/NNtssI=
|
||||
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
|
||||
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
|
||||
github.com/segmentio/backo-go v1.0.1 h1:68RQccglxZeyURy93ASB/2kc9QudzgIDexJ927N++y4=
|
||||
github.com/segmentio/backo-go v1.0.1/go.mod h1:9/Rh6yILuLysoQnZ2oNooD2g7aBnvM7r/fNVxRNWfBc=
|
||||
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
|
||||
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
|
||||
github.com/sethvargo/go-password v0.2.0 h1:BTDl4CC/gjf/axHMaDQtw507ogrXLci6XRiLc7i/UHI=
|
||||
github.com/sethvargo/go-password v0.2.0/go.mod h1:Ym4Mr9JXLBycr02MFuVQ/0JHidNetSgbzutTr3zsYXE=
|
||||
github.com/shirou/gopsutil/v4 v4.24.9 h1:KIV+/HaHD5ka5f570RZq+2SaeFsb/pq+fp2DGNWYoOI=
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"github.com/gorilla/mux"
|
||||
promModel "github.com/prometheus/common/model"
|
||||
"go.uber.org/multierr"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/query-service/app/metrics"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/app/queryBuilder"
|
||||
@ -34,6 +35,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/query-service/utils"
|
||||
querytemplate "github.com/SigNoz/signoz/pkg/query-service/utils/queryTemplate"
|
||||
"github.com/SigNoz/signoz/pkg/types"
|
||||
chVariables "github.com/SigNoz/signoz/pkg/variables/clickhouse"
|
||||
)
|
||||
|
||||
var allowedFunctions = []string{"count", "ratePerSec", "sum", "avg", "min", "max", "p50", "p90", "p95", "p99"}
|
||||
@ -841,6 +843,29 @@ func validateExpressions(expressions []string, funcs map[string]govaluate.Expres
|
||||
return errs
|
||||
}
|
||||
|
||||
// chTransformQuery transforms the clickhouse query with the given variables
|
||||
// it is used to check what would be the query if variables are selected as __all__.
|
||||
// for now, this is just a pass through, but in the future, we will use it to
|
||||
// dashboard variables
|
||||
// TODO(srikanthccv): version based query replacement
|
||||
func chTransformQuery(query string, variables map[string]interface{}) {
|
||||
varsForTransform := make([]chVariables.VariableValue, 0, len(variables))
|
||||
for name := range variables {
|
||||
varsForTransform = append(varsForTransform, chVariables.VariableValue{
|
||||
Name: name,
|
||||
Values: []string{"__all__"},
|
||||
IsSelectAll: true,
|
||||
FieldType: "scalar",
|
||||
})
|
||||
}
|
||||
transformer := chVariables.NewQueryTransformer(query, varsForTransform)
|
||||
transformedQuery, err := transformer.Transform()
|
||||
if err != nil {
|
||||
zap.L().Warn("failed to transform clickhouse query", zap.Error(err))
|
||||
}
|
||||
zap.L().Info("transformed clickhouse query", zap.String("transformedQuery", transformedQuery), zap.String("originalQuery", query))
|
||||
}
|
||||
|
||||
func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiError) {
|
||||
|
||||
var queryRangeParams *v3.QueryRangeParamsV3
|
||||
@ -979,6 +1004,7 @@ func ParseQueryRangeParams(r *http.Request) (*v3.QueryRangeParamsV3, *model.ApiE
|
||||
continue
|
||||
}
|
||||
|
||||
chTransformQuery(chQuery.Query, queryRangeParams.Variables)
|
||||
for name, value := range queryRangeParams.Variables {
|
||||
chQuery.Query = strings.Replace(chQuery.Query, fmt.Sprintf("{{%s}}", name), fmt.Sprint(value), -1)
|
||||
chQuery.Query = strings.Replace(chQuery.Query, fmt.Sprintf("[[%s]]", name), fmt.Sprint(value), -1)
|
||||
|
511
pkg/variables/clickhouse/processor.go
Normal file
511
pkg/variables/clickhouse/processor.go
Normal file
@ -0,0 +1,511 @@
|
||||
package clickhouse
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/AfterShip/clickhouse-sql-parser/parser"
|
||||
)
|
||||
|
||||
// FilterAction represents what to do with a filter containing a variable
|
||||
type FilterAction int
|
||||
|
||||
const (
|
||||
// KeepFilter maintains the original filter
|
||||
KeepFilter FilterAction = iota
|
||||
// RemoveFilter completely removes the filter
|
||||
RemoveFilter
|
||||
// ReplaceWithExistsCheck replaces filter with an EXISTS check
|
||||
ReplaceWithExistsCheck
|
||||
)
|
||||
|
||||
// FilterTransformer defines the callback function that decides
|
||||
// what to do with a filter containing a variable
|
||||
type FilterTransformer func(variableName string, expr parser.Expr) FilterAction
|
||||
|
||||
// QueryProcessor handles ClickHouse query modifications
|
||||
type QueryProcessor struct {
|
||||
}
|
||||
|
||||
// NewQueryProcessor creates a new processor
|
||||
func NewQueryProcessor() *QueryProcessor {
|
||||
return &QueryProcessor{}
|
||||
}
|
||||
|
||||
// ProcessQuery finds variables in WHERE clauses and modifies them according to the transformer function
|
||||
func (qp *QueryProcessor) ProcessQuery(query string, transformer FilterTransformer) (string, error) {
|
||||
p := parser.NewParser(query)
|
||||
stmts, err := p.ParseStmts()
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to parse query: %w", err)
|
||||
}
|
||||
|
||||
if len(stmts) == 0 {
|
||||
return query, nil
|
||||
}
|
||||
|
||||
// Look for SELECT statements
|
||||
modified := false
|
||||
for i, stmt := range stmts {
|
||||
selectQuery, ok := stmt.(*parser.SelectQuery)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
whereModified, err := qp.processWhereClause(selectQuery, transformer)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if whereModified {
|
||||
modified = true
|
||||
stmts[i] = selectQuery
|
||||
}
|
||||
}
|
||||
|
||||
if !modified {
|
||||
return query, nil
|
||||
}
|
||||
|
||||
// Reconstruct the query
|
||||
var resultBuilder strings.Builder
|
||||
for _, stmt := range stmts {
|
||||
resultBuilder.WriteString(stmt.String())
|
||||
resultBuilder.WriteString(";")
|
||||
}
|
||||
|
||||
return resultBuilder.String(), nil
|
||||
}
|
||||
|
||||
// processWhereClause processes the WHERE clause in a SELECT statement
|
||||
func (qp *QueryProcessor) processWhereClause(selectQuery *parser.SelectQuery, transformer FilterTransformer) (bool, error) {
|
||||
// First, process any subqueries in the FROM clause
|
||||
if selectQuery.From != nil {
|
||||
subQueryModified, err := qp.processFromClauseSubqueries(selectQuery.From, transformer)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if subQueryModified {
|
||||
// Mark as modified if any subqueries were modified
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Then process the main WHERE clause
|
||||
if selectQuery.Where == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Process the WHERE expression, which may include subqueries
|
||||
modified := false
|
||||
newExpr, hasChanged, err := qp.transformExpr(selectQuery.Where.Expr, transformer)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if hasChanged {
|
||||
modified = true
|
||||
if newExpr == nil {
|
||||
// If the entire WHERE clause is removed
|
||||
selectQuery.Where = nil
|
||||
} else {
|
||||
selectQuery.Where.Expr = newExpr
|
||||
}
|
||||
}
|
||||
|
||||
return modified, nil
|
||||
}
|
||||
|
||||
// processFromClauseSubqueries recursively processes subqueries in the FROM clause
|
||||
func (qp *QueryProcessor) processFromClauseSubqueries(fromClause *parser.FromClause, transformer FilterTransformer) (bool, error) {
|
||||
if fromClause == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return qp.processExprSubqueries(fromClause.Expr, transformer)
|
||||
}
|
||||
|
||||
// processExprSubqueries processes subqueries found in expressions
|
||||
func (qp *QueryProcessor) processExprSubqueries(expr parser.Expr, transformer FilterTransformer) (bool, error) {
|
||||
if expr == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
modified := false
|
||||
|
||||
switch e := expr.(type) {
|
||||
case *parser.SubQuery:
|
||||
// Process the subquery's SELECT statement
|
||||
if e.Select != nil {
|
||||
subQueryModified, err := qp.processWhereClause(e.Select, transformer)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if subQueryModified {
|
||||
modified = true
|
||||
}
|
||||
}
|
||||
|
||||
case *parser.BinaryOperation:
|
||||
// Check left and right expressions for subqueries
|
||||
leftModified, err := qp.processExprSubqueries(e.LeftExpr, transformer)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
rightModified, err := qp.processExprSubqueries(e.RightExpr, transformer)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if leftModified || rightModified {
|
||||
modified = true
|
||||
}
|
||||
|
||||
case *parser.JoinExpr:
|
||||
// Process both sides of the join
|
||||
leftModified, err := qp.processExprSubqueries(e.Left, transformer)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
rightModified, err := qp.processExprSubqueries(e.Right, transformer)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Process join constraints if any
|
||||
constraintsModified, err := qp.processExprSubqueries(e.Constraints, transformer)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if leftModified || rightModified || constraintsModified {
|
||||
modified = true
|
||||
}
|
||||
|
||||
case *parser.TableExpr:
|
||||
// Process any subqueries in the table expression
|
||||
return qp.processExprSubqueries(e.Expr, transformer)
|
||||
|
||||
case *parser.AliasExpr:
|
||||
// Check if the aliased expression contains a subquery
|
||||
return qp.processExprSubqueries(e.Expr, transformer)
|
||||
|
||||
case *parser.FunctionExpr:
|
||||
// Check function parameters for subqueries
|
||||
if e.Params != nil && e.Params.Items != nil {
|
||||
for _, item := range e.Params.Items.Items {
|
||||
itemModified, err := qp.processExprSubqueries(item, transformer)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if itemModified {
|
||||
modified = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return modified, nil
|
||||
}
|
||||
|
||||
// transformExpr recursively processes expressions in the WHERE clause
|
||||
func (qp *QueryProcessor) transformExpr(expr parser.Expr, transformer FilterTransformer) (parser.Expr, bool, error) {
|
||||
if expr == nil {
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
// Handle different expression types
|
||||
switch e := expr.(type) {
|
||||
case *parser.SubQuery:
|
||||
// Handle subqueries like "column IN (SELECT...)"
|
||||
if e.Select != nil {
|
||||
modified, err := qp.processWhereClause(e.Select, transformer)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
return expr, modified, nil
|
||||
}
|
||||
|
||||
case *parser.BinaryOperation:
|
||||
// Handle IN with a subquery on the right
|
||||
if e.Operation == "IN" || e.Operation == "NOT IN" {
|
||||
_, rightIsSubQuery := e.RightExpr.(*parser.SubQuery)
|
||||
if rightIsSubQuery {
|
||||
// If right side is a subquery, check if left side has variables
|
||||
leftVars := qp.findVariables(e.LeftExpr)
|
||||
if len(leftVars) > 0 {
|
||||
// Apply action to the entire IN clause
|
||||
action := transformer(leftVars[0], expr)
|
||||
switch action {
|
||||
case RemoveFilter:
|
||||
return nil, true, nil
|
||||
case ReplaceWithExistsCheck:
|
||||
return qp.createExistsCheck(expr, leftVars[0])
|
||||
}
|
||||
}
|
||||
|
||||
// Process the subquery separately (regardless of whether we modified based on left side)
|
||||
newRight, rightChanged, err := qp.transformExpr(e.RightExpr, transformer)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
if rightChanged {
|
||||
return &parser.BinaryOperation{
|
||||
LeftExpr: e.LeftExpr,
|
||||
Operation: e.Operation,
|
||||
RightExpr: newRight,
|
||||
HasGlobal: e.HasGlobal,
|
||||
HasNot: e.HasNot,
|
||||
}, true, nil
|
||||
}
|
||||
|
||||
// If no changes, return the original
|
||||
return expr, false, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Check if this specific binary operation directly contains a variable
|
||||
leftVars := qp.findVariables(e.LeftExpr)
|
||||
rightVars := qp.findVariables(e.RightExpr)
|
||||
|
||||
// If this is a direct filter with a variable (e.g., "column = $var")
|
||||
// and not a complex expression, handle it directly
|
||||
if len(leftVars) > 0 && len(rightVars) == 0 &&
|
||||
!qp.isComplexExpression(e.LeftExpr) && !qp.isComplexExpression(e.RightExpr) {
|
||||
action := transformer(leftVars[0], expr)
|
||||
switch action {
|
||||
case RemoveFilter:
|
||||
return nil, true, nil
|
||||
case ReplaceWithExistsCheck:
|
||||
return qp.createExistsCheck(expr, leftVars[0])
|
||||
}
|
||||
} else if len(rightVars) > 0 && len(leftVars) == 0 &&
|
||||
!qp.isComplexExpression(e.LeftExpr) && !qp.isComplexExpression(e.RightExpr) {
|
||||
action := transformer(rightVars[0], expr)
|
||||
switch action {
|
||||
case RemoveFilter:
|
||||
return nil, true, nil
|
||||
case ReplaceWithExistsCheck:
|
||||
return qp.createExistsCheck(expr, rightVars[0])
|
||||
}
|
||||
}
|
||||
|
||||
// Otherwise, recursively process left and right sides
|
||||
newLeft, leftChanged, err := qp.transformExpr(e.LeftExpr, transformer)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
newRight, rightChanged, err := qp.transformExpr(e.RightExpr, transformer)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
if leftChanged || rightChanged {
|
||||
if e.Operation == "AND" {
|
||||
// For AND operations, if either side is nil (removed), we can simplify
|
||||
if newLeft == nil {
|
||||
return newRight, true, nil
|
||||
}
|
||||
if newRight == nil {
|
||||
return newLeft, true, nil
|
||||
}
|
||||
} else if (newLeft == nil || newRight == nil) &&
|
||||
(e.Operation == "=" || e.Operation == "IN" ||
|
||||
e.Operation == "<" || e.Operation == ">" ||
|
||||
e.Operation == "<=" || e.Operation == ">=") {
|
||||
// For direct comparison operations, if one side is removed, remove the entire expression
|
||||
return nil, true, nil
|
||||
}
|
||||
|
||||
// Create a new binary operation with the modified sides
|
||||
return &parser.BinaryOperation{
|
||||
LeftExpr: newLeft,
|
||||
Operation: e.Operation,
|
||||
RightExpr: newRight,
|
||||
HasGlobal: e.HasGlobal,
|
||||
HasNot: e.HasNot,
|
||||
}, true, nil
|
||||
}
|
||||
}
|
||||
|
||||
// For other expression types that may contain variables
|
||||
variables := qp.findVariables(expr)
|
||||
if len(variables) > 0 && !qp.isComplexExpression(expr) {
|
||||
action := transformer(variables[0], expr)
|
||||
switch action {
|
||||
case RemoveFilter:
|
||||
return nil, true, nil
|
||||
case ReplaceWithExistsCheck:
|
||||
return qp.createExistsCheck(expr, variables[0])
|
||||
}
|
||||
}
|
||||
|
||||
return expr, false, nil
|
||||
}
|
||||
|
||||
// isComplexExpression checks if an expression contains nested operations
|
||||
// that should not be treated as a simple variable reference
|
||||
func (qp *QueryProcessor) isComplexExpression(expr parser.Expr) bool {
|
||||
switch e := expr.(type) {
|
||||
case *parser.BinaryOperation:
|
||||
// If it's a binary operation, it's complex
|
||||
return true
|
||||
case *parser.FunctionExpr:
|
||||
// If it's a function, examine its parameters
|
||||
if e.Params != nil && e.Params.Items != nil {
|
||||
for _, item := range e.Params.Items.Items {
|
||||
if qp.isComplexExpression(item) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// findVariables finds all variables in an expression
|
||||
func (qp *QueryProcessor) findVariables(expr parser.Expr) []string {
|
||||
var variables []string
|
||||
|
||||
if expr == nil {
|
||||
return variables
|
||||
}
|
||||
|
||||
switch e := expr.(type) {
|
||||
case *parser.Ident:
|
||||
// we should identify the following ways of using variables
|
||||
// whitespace at the end or beginning of the variable name
|
||||
// should be trimmed
|
||||
// {{.variable_name}}, {{ .variable_name }}, {{ .variable_name}}
|
||||
// $variable_name
|
||||
// [[variable_name]], [[ variable_name]], [[ variable_name ]]
|
||||
// {{variable_name}}, {{ variable_name }}, {{variable_name }}
|
||||
|
||||
if strings.HasPrefix(e.Name, "$") {
|
||||
variables = append(variables, e.Name[1:]) // Remove the $ prefix
|
||||
}
|
||||
|
||||
case *parser.BinaryOperation:
|
||||
variables = append(variables, qp.findVariables(e.LeftExpr)...)
|
||||
variables = append(variables, qp.findVariables(e.RightExpr)...)
|
||||
case *parser.FunctionExpr:
|
||||
if e.Params != nil && e.Params.Items != nil {
|
||||
for _, item := range e.Params.Items.Items {
|
||||
variables = append(variables, qp.findVariables(item)...)
|
||||
}
|
||||
}
|
||||
case *parser.ColumnExpr:
|
||||
variables = append(variables, qp.findVariables(e.Expr)...)
|
||||
case *parser.ParamExprList:
|
||||
if e.Items != nil {
|
||||
for _, item := range e.Items.Items {
|
||||
variables = append(variables, qp.findVariables(item)...)
|
||||
}
|
||||
}
|
||||
case *parser.SelectItem:
|
||||
variables = append(variables, qp.findVariables(e.Expr)...)
|
||||
case *parser.IndexOperation:
|
||||
variables = append(variables, qp.findVariables(e.Object)...)
|
||||
variables = append(variables, qp.findVariables(e.Index)...)
|
||||
}
|
||||
|
||||
return variables
|
||||
}
|
||||
|
||||
// createExistsCheck creates an EXISTS check for a column/map field
|
||||
func (qp *QueryProcessor) createExistsCheck(expr parser.Expr, _ string) (parser.Expr, bool, error) {
|
||||
switch e := expr.(type) {
|
||||
case *parser.BinaryOperation:
|
||||
// Handle map field access like "attributes['http.method'] = $http_method"
|
||||
if indexOp, ok := e.LeftExpr.(*parser.IndexOperation); ok {
|
||||
// Create a "has" function check for maps
|
||||
functionName := &parser.Ident{
|
||||
Name: "has",
|
||||
}
|
||||
|
||||
// Create function parameters with the map and the key
|
||||
params := &parser.ParamExprList{
|
||||
Items: &parser.ColumnExprList{
|
||||
Items: []parser.Expr{
|
||||
indexOp.Object, // The map name (e.g., "attributes")
|
||||
indexOp.Index, // The key (e.g., "'http.method'")
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
return &parser.FunctionExpr{
|
||||
Name: functionName,
|
||||
Params: params,
|
||||
}, true, nil
|
||||
}
|
||||
|
||||
// Handle direct field comparisons like "field = $variable"
|
||||
if ident, ok := e.LeftExpr.(*parser.Ident); ok && !strings.HasPrefix(ident.Name, "$") {
|
||||
// For regular columns, we might want to check if the column exists or has a non-null value
|
||||
functionName := &parser.Ident{
|
||||
Name: "isNotNull",
|
||||
}
|
||||
|
||||
// Create function parameters
|
||||
params := &parser.ParamExprList{
|
||||
Items: &parser.ColumnExprList{
|
||||
Items: []parser.Expr{
|
||||
ident, // The field name
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
return &parser.FunctionExpr{
|
||||
Name: functionName,
|
||||
Params: params,
|
||||
}, true, nil
|
||||
} else if ident, ok := e.RightExpr.(*parser.Ident); ok && !strings.HasPrefix(ident.Name, "$") {
|
||||
// For regular columns, but variable is on the left
|
||||
functionName := &parser.Ident{
|
||||
Name: "isNotNull",
|
||||
}
|
||||
|
||||
params := &parser.ParamExprList{
|
||||
Items: &parser.ColumnExprList{
|
||||
Items: []parser.Expr{
|
||||
ident, // The field name
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
return &parser.FunctionExpr{
|
||||
Name: functionName,
|
||||
Params: params,
|
||||
}, true, nil
|
||||
}
|
||||
|
||||
// Handle IN clauses like "field IN ($variables)"
|
||||
if e.Operation == "IN" || e.Operation == "NOT IN" {
|
||||
if ident, ok := e.LeftExpr.(*parser.Ident); ok && !strings.HasPrefix(ident.Name, "$") {
|
||||
// For IN clauses, we might just check if the field exists
|
||||
functionName := &parser.Ident{
|
||||
Name: "isNotNull",
|
||||
}
|
||||
|
||||
params := &parser.ParamExprList{
|
||||
Items: &parser.ColumnExprList{
|
||||
Items: []parser.Expr{
|
||||
ident, // The field name
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
return &parser.FunctionExpr{
|
||||
Name: functionName,
|
||||
Params: params,
|
||||
}, true, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we couldn't transform it to an EXISTS check, keep the original
|
||||
return expr, false, nil
|
||||
}
|
210
pkg/variables/clickhouse/transfor_test.go
Normal file
210
pkg/variables/clickhouse/transfor_test.go
Normal file
@ -0,0 +1,210 @@
|
||||
package clickhouse
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestTransform(t *testing.T) {
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
sql string
|
||||
variables []VariableValue
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "Example 1: Only service_name is __all__",
|
||||
sql: `SELECT trace_id, name, kind, status
|
||||
FROM signoz_traces.signoz_index_v3
|
||||
WHERE service_name = $service_name
|
||||
AND operation_name IN ($operation_names)
|
||||
AND duration >= $min_duration
|
||||
AND attributes['http.method'] = $http_method
|
||||
AND op IN (SELECT op FROM signoz_traces.operations WHERE service_name = $service_name AND kind = 'server')
|
||||
AND timestamp BETWEEN $start_time AND $end_time`,
|
||||
|
||||
// Define our variables and their values/metadata
|
||||
variables: []VariableValue{
|
||||
{
|
||||
Name: "service_name",
|
||||
Values: []string{"__all__"}, // User selected "__all__"
|
||||
IsSelectAll: true,
|
||||
FieldType: "scalar",
|
||||
},
|
||||
{
|
||||
Name: "operation_names",
|
||||
Values: []string{"op1", "op2", "op3"},
|
||||
IsSelectAll: false, // User selected specific values
|
||||
FieldType: "array",
|
||||
},
|
||||
{
|
||||
Name: "min_duration",
|
||||
Values: []string{"100"},
|
||||
IsSelectAll: false,
|
||||
FieldType: "scalar",
|
||||
},
|
||||
{
|
||||
Name: "http_method",
|
||||
Values: []string{"GET", "POST"},
|
||||
IsSelectAll: false,
|
||||
FieldType: "map", // This is a map lookup
|
||||
},
|
||||
{
|
||||
Name: "start_time",
|
||||
Values: []string{"2023-01-01 00:00:00"},
|
||||
IsSelectAll: false,
|
||||
FieldType: "scalar",
|
||||
},
|
||||
{
|
||||
Name: "end_time",
|
||||
Values: []string{"2023-01-02 00:00:00"},
|
||||
IsSelectAll: false,
|
||||
FieldType: "scalar",
|
||||
},
|
||||
},
|
||||
expected: `SELECT trace_id, name, kind, status FROM signoz_traces.signoz_index_v3 WHERE operation_name IN ($operation_names) AND duration >= $min_duration AND attributes['http.method'] = $http_method AND op IN (SELECT op FROM signoz_traces.operations WHERE kind = 'server') AND timestamp BETWEEN $start_time AND $end_time;`,
|
||||
},
|
||||
{
|
||||
name: "Example 2: Multiple __all__ selections and map lookups",
|
||||
sql: `SELECT trace_id, name, kind, status
|
||||
FROM signoz_traces.signoz_index_v3
|
||||
WHERE service_name = $service_name
|
||||
AND operation_name IN ($operation_names)
|
||||
AND duration >= $min_duration
|
||||
AND attributes['http.method'] = $http_method
|
||||
AND op IN (SELECT op FROM signoz_traces.operations WHERE service_name = $service_name AND kind = 'server')
|
||||
AND timestamp BETWEEN $start_time AND $end_time`,
|
||||
|
||||
variables: []VariableValue{
|
||||
{
|
||||
Name: "service_name",
|
||||
Values: []string{"__all__"},
|
||||
IsSelectAll: true,
|
||||
FieldType: "scalar",
|
||||
},
|
||||
{
|
||||
Name: "operation_names",
|
||||
Values: []string{"__all__"},
|
||||
IsSelectAll: true,
|
||||
FieldType: "array",
|
||||
},
|
||||
{
|
||||
Name: "min_duration",
|
||||
Values: []string{"__all__"},
|
||||
IsSelectAll: true,
|
||||
FieldType: "scalar",
|
||||
},
|
||||
{
|
||||
Name: "http_method",
|
||||
Values: []string{"GET", "POST"},
|
||||
IsSelectAll: false,
|
||||
FieldType: "map",
|
||||
},
|
||||
{
|
||||
Name: "start_time",
|
||||
Values: []string{"__all__"},
|
||||
IsSelectAll: true,
|
||||
FieldType: "scalar",
|
||||
},
|
||||
{
|
||||
Name: "end_time",
|
||||
Values: []string{"__all__"},
|
||||
IsSelectAll: true,
|
||||
FieldType: "scalar",
|
||||
},
|
||||
},
|
||||
expected: `SELECT trace_id, name, kind, status FROM signoz_traces.signoz_index_v3 WHERE attributes['http.method'] = $http_method AND op IN (SELECT op FROM signoz_traces.operations WHERE kind = 'server') AND timestamp BETWEEN $start_time AND $end_time;`,
|
||||
},
|
||||
{
|
||||
name: "Example 3: Multiple __all__ selections and map lookups",
|
||||
sql: `SELECT trace_id, name, kind, status
|
||||
FROM signoz_traces.signoz_index_v3
|
||||
WHERE service_name = $service_name
|
||||
AND operation_name IN ($operation_names)
|
||||
AND duration >= $min_duration
|
||||
AND attributes['http.method'] = $http_method
|
||||
AND op IN (SELECT op FROM signoz_traces.operations WHERE service_name = $service_name AND kind = 'server')
|
||||
AND timestamp BETWEEN $start_time AND $end_time`,
|
||||
|
||||
variables: []VariableValue{
|
||||
{
|
||||
Name: "service_name",
|
||||
Values: []string{"__all__"},
|
||||
IsSelectAll: true,
|
||||
FieldType: "scalar",
|
||||
},
|
||||
{
|
||||
Name: "operation_names",
|
||||
Values: []string{"__all__"},
|
||||
IsSelectAll: true,
|
||||
FieldType: "array",
|
||||
},
|
||||
{
|
||||
Name: "min_duration",
|
||||
Values: []string{"__all__"},
|
||||
IsSelectAll: true,
|
||||
FieldType: "scalar",
|
||||
},
|
||||
{
|
||||
Name: "http_method",
|
||||
Values: []string{"__all__"},
|
||||
IsSelectAll: true,
|
||||
FieldType: "map",
|
||||
},
|
||||
},
|
||||
expected: `SELECT trace_id, name, kind, status FROM signoz_traces.signoz_index_v3 WHERE op IN (SELECT op FROM signoz_traces.operations WHERE kind = 'server') AND timestamp BETWEEN $start_time AND $end_time;`,
|
||||
},
|
||||
{
|
||||
name: "Example 3: Multiple __all__ selections and map lookups",
|
||||
sql: `SELECT trace_id, name, kind, status
|
||||
FROM signoz_traces.signoz_index_v3
|
||||
WHERE service_name = {{service_name}}
|
||||
AND operation_name IN {{.operation_names}}
|
||||
AND duration >= {{min_duration}}
|
||||
AND attributes['http.method'] = $http_method
|
||||
AND op IN (SELECT op FROM signoz_traces.operations WHERE service_name = {{service_name}} AND kind = 'server')`,
|
||||
|
||||
variables: []VariableValue{
|
||||
{
|
||||
Name: "service_name",
|
||||
Values: []string{"__all__"},
|
||||
IsSelectAll: true,
|
||||
FieldType: "scalar",
|
||||
},
|
||||
{
|
||||
Name: "operation_names",
|
||||
Values: []string{"__all__"},
|
||||
IsSelectAll: true,
|
||||
FieldType: "array",
|
||||
},
|
||||
{
|
||||
Name: "min_duration",
|
||||
Values: []string{"__all__"},
|
||||
IsSelectAll: true,
|
||||
FieldType: "scalar",
|
||||
},
|
||||
{
|
||||
Name: "http_method",
|
||||
Values: []string{"__all__"},
|
||||
IsSelectAll: true,
|
||||
FieldType: "map",
|
||||
},
|
||||
},
|
||||
expected: `SELECT trace_id, name, kind, status FROM signoz_traces.signoz_index_v3 WHERE op IN (SELECT op FROM signoz_traces.operations WHERE kind = 'server');`,
|
||||
},
|
||||
}
|
||||
|
||||
for _, testCase := range testCases {
|
||||
t.Run(testCase.name, func(t *testing.T) {
|
||||
transformer := NewQueryTransformer(testCase.sql, testCase.variables)
|
||||
modifiedQuery, err := transformer.Transform()
|
||||
if err != nil {
|
||||
t.Fatalf("Error transforming query: %v", err)
|
||||
}
|
||||
|
||||
if modifiedQuery != testCase.expected {
|
||||
t.Errorf("Expected transformed query to be:\n%s\nBut got:\n%s", testCase.expected, modifiedQuery)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
74
pkg/variables/clickhouse/transform.go
Normal file
74
pkg/variables/clickhouse/transform.go
Normal file
@ -0,0 +1,74 @@
|
||||
package clickhouse
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/AfterShip/clickhouse-sql-parser/parser"
|
||||
)
|
||||
|
||||
// VariableValue represents a variable's assigned value
|
||||
type VariableValue struct {
|
||||
Name string
|
||||
Values []string
|
||||
IsSelectAll bool
|
||||
FieldType string // "scalar", "array", "map", etc.
|
||||
}
|
||||
|
||||
// QueryTransformer handles the transformation of queries based on variable values
|
||||
type QueryTransformer struct {
|
||||
processor *QueryProcessor
|
||||
variables map[string]VariableValue
|
||||
originalSQL string
|
||||
}
|
||||
|
||||
// NewQueryTransformer creates a new transformer with the given SQL and variables
|
||||
func NewQueryTransformer(sql string, variables []VariableValue) *QueryTransformer {
|
||||
varMap := make(map[string]VariableValue)
|
||||
for _, v := range variables {
|
||||
varMap[v.Name] = v
|
||||
}
|
||||
|
||||
// for each variable, replace the `{{variable_name}}`, [[variable_name]], {{ .variable_name }}, {{.variable_name}}
|
||||
// with $variable_name
|
||||
for name := range varMap {
|
||||
sql = strings.Replace(sql, fmt.Sprintf("{{%s}}", name), fmt.Sprintf("$%s", name), -1)
|
||||
sql = strings.Replace(sql, fmt.Sprintf("[[%s]]", name), fmt.Sprintf("$%s", name), -1)
|
||||
sql = strings.Replace(sql, fmt.Sprintf("{{ .%s }}", name), fmt.Sprintf("$%s", name), -1)
|
||||
sql = strings.Replace(sql, fmt.Sprintf("{{.%s}}", name), fmt.Sprintf("$%s", name), -1)
|
||||
}
|
||||
|
||||
return &QueryTransformer{
|
||||
processor: NewQueryProcessor(),
|
||||
variables: varMap,
|
||||
originalSQL: sql,
|
||||
}
|
||||
}
|
||||
|
||||
// Transform processes the query and returns a transformed version
|
||||
func (t *QueryTransformer) Transform() (string, error) {
|
||||
return t.processor.ProcessQuery(t.originalSQL, t.transformFilter)
|
||||
}
|
||||
|
||||
// transformFilter is the callback function that decides what to do with each filter
|
||||
func (t *QueryTransformer) transformFilter(variableName string, expr parser.Expr) FilterAction {
|
||||
// Check if we have info about this variable
|
||||
varInfo, exists := t.variables[variableName]
|
||||
if !exists {
|
||||
// If we don't have info, keep the filter as is
|
||||
return KeepFilter
|
||||
}
|
||||
|
||||
// If the user selected "__all__", we should remove the filter
|
||||
if varInfo.IsSelectAll {
|
||||
return RemoveFilter
|
||||
}
|
||||
|
||||
// For maps, we might want to check for existence rather than equality
|
||||
if varInfo.FieldType == "map" {
|
||||
return ReplaceWithExistsCheck
|
||||
}
|
||||
|
||||
// Otherwise keep the filter as is (it will be filled with the actual values)
|
||||
return KeepFilter
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user