Fix case sensitivity in query parsing (#1670)

* Fix case sensitivity in query parsing - now the parser correctly recognize fields which contains uppercase letters

* fix: logs parser respects the case of fields

Co-authored-by: nityanandagohain <nityanandagohain@gmail.com>
Co-authored-by: Pranay Prateek <pranay@signoz.io>
Co-authored-by: Ankit Nayan <ankit@signoz.io>
This commit is contained in:
Zsombor 2022-12-10 14:57:57 +01:00 committed by GitHub
parent 16170eacc0
commit c38d1c150d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 61 additions and 18 deletions

View File

@ -198,8 +198,6 @@ func parseLogQuery(query string) ([]string, error) {
} }
func parseColumn(s string) (*string, error) { func parseColumn(s string) (*string, error) {
s = strings.ToLower(s)
colName := "" colName := ""
// if has and/or as prefix // if has and/or as prefix
@ -208,7 +206,8 @@ func parseColumn(s string) (*string, error) {
return nil, fmt.Errorf("incorrect filter") return nil, fmt.Errorf("incorrect filter")
} }
if strings.HasPrefix(s, AND) || strings.HasPrefix(s, OR) { first := strings.ToLower(filter[0])
if first == AND || first == OR {
colName = filter[1] colName = filter[1]
} else { } else {
colName = filter[0] colName = filter[0]
@ -231,27 +230,37 @@ func replaceInterestingFields(allFields *model.GetFieldsResponse, queryTokens []
interestingFieldLookup := arrayToMap(allFields.Interesting) interestingFieldLookup := arrayToMap(allFields.Interesting)
for index := 0; index < len(queryTokens); index++ { for index := 0; index < len(queryTokens); index++ {
queryToken := queryTokens[index] result, err := replaceFieldInToken(queryTokens[index], selectedFieldsLookup, interestingFieldLookup)
col, err := parseColumn(queryToken)
if err != nil { if err != nil {
return nil, err return nil, err
} }
queryTokens[index] = result
sqlColName := *col
if _, ok := selectedFieldsLookup[*col]; !ok && *col != "body" {
if field, ok := interestingFieldLookup[*col]; ok {
if field.Type != constants.Static {
sqlColName = fmt.Sprintf("%s_%s_value[indexOf(%s_%s_key, '%s')]", field.Type, strings.ToLower(field.DataType), field.Type, strings.ToLower(field.DataType), *col)
}
} else if strings.Compare(strings.ToLower(*col), "fulltext") != 0 && field.Type != constants.Static {
return nil, fmt.Errorf("field not found for filtering")
}
}
queryTokens[index] = strings.Replace(queryToken, *col, sqlColName, 1)
} }
return queryTokens, nil return queryTokens, nil
} }
func replaceFieldInToken(queryToken string, selectedFieldsLookup map[string]model.LogField, interestingFieldLookup map[string]model.LogField) (string, error) {
col, err := parseColumn(queryToken)
if err != nil {
return "", err
}
sqlColName := *col
lowerColName := strings.ToLower(*col)
if lowerColName != "body" {
if _, ok := selectedFieldsLookup[sqlColName]; !ok {
if field, ok := interestingFieldLookup[sqlColName]; ok {
if field.Type != constants.Static {
sqlColName = fmt.Sprintf("%s_%s_value[indexOf(%s_%s_key, '%s')]", field.Type, strings.ToLower(field.DataType), field.Type, strings.ToLower(field.DataType), field.Name)
}
} else if strings.Compare(strings.ToLower(*col), "fulltext") != 0 && field.Type != constants.Static {
return "", fmt.Errorf("field not found for filtering")
}
}
}
return strings.Replace(queryToken, *col, sqlColName, 1), nil
}
func CheckIfPrevousPaginateAndModifyOrder(params *model.LogsFilterParams) (isPaginatePrevious bool) { func CheckIfPrevousPaginateAndModifyOrder(params *model.LogsFilterParams) (isPaginatePrevious bool) {
if params.IdGt != "" && params.OrderBy == TIMESTAMP && params.Order == DESC { if params.IdGt != "" && params.OrderBy == TIMESTAMP && params.Order == DESC {
isPaginatePrevious = true isPaginatePrevious = true

View File

@ -161,6 +161,26 @@ var parseCorrectColumns = []struct {
"and id_userid >= 50 ", "and id_userid >= 50 ",
"id_userid", "id_userid",
}, },
{
"column starting with and",
"andor = 1",
"andor",
},
{
"column starting with and after an 'and'",
"and andor = 1",
"andor",
},
{
"column starting with And",
"Andor = 1",
"Andor",
},
{
"column starting with and after an 'and'",
"and Andor = 1",
"Andor",
},
{ {
"column with ilike", "column with ilike",
`AND body ILIKE '%searchstring%' `, `AND body ILIKE '%searchstring%' `,
@ -279,7 +299,7 @@ var generateSQLQueryFields = model.GetFieldsResponse{
Type: "attributes", Type: "attributes",
}, },
{ {
Name: "field2", Name: "Field2",
DataType: "double64", DataType: "double64",
Type: "attributes", Type: "attributes",
}, },
@ -290,6 +310,11 @@ var generateSQLQueryFields = model.GetFieldsResponse{
}, },
}, },
Interesting: []model.LogField{ Interesting: []model.LogField{
{
Name: "FielD1",
DataType: "int64",
Type: "attributes",
},
{ {
Name: "code", Name: "code",
DataType: "int64", DataType: "int64",
@ -323,6 +348,15 @@ var generateSQLQueryTestCases = []struct {
}, },
SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' ) and ( field1 < 100 and field1 > 50 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ", SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' ) and ( field1 < 100 and field1 > 50 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ",
}, },
{
Name: "generate case sensitive query",
Filter: model.LogsFilterParams{
Query: "field1 lt 100 and FielD1 gt 50 and Field2 gt 10 and code lte 500 and code gte 400",
TimestampStart: uint64(1657689292000),
TimestampEnd: uint64(1657689294000),
},
SqlFilter: "( timestamp >= '1657689292000' and timestamp <= '1657689294000' ) and ( field1 < 100 and attributes_int64_value[indexOf(attributes_int64_key, 'FielD1')] > 50 and Field2 > 10 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 ) ",
},
} }
func TestGenerateSQLQuery(t *testing.T) { func TestGenerateSQLQuery(t *testing.T) {