primitive type pointers removed

This commit is contained in:
nityanandagohain 2022-07-22 16:49:40 +05:30
parent 373cbbc375
commit 2f17898390
5 changed files with 84 additions and 90 deletions

View File

@ -2825,15 +2825,13 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda
}
// create the index
if field.IndexType == nil {
iType := constants.DefaultLogSkipIndexType
field.IndexType = &iType
if field.IndexType == "" {
field.IndexType = constants.DefaultLogSkipIndexType
}
if field.IndexGranularity == nil {
granularity := constants.DefaultLogSkipIndexGranularity
field.IndexGranularity = &granularity
if field.IndexGranularity == 0 {
field.IndexGranularity = constants.DefaultLogSkipIndexGranularity
}
query := fmt.Sprintf("ALTER TABLE %s.%s ADD INDEX IF NOT EXISTS %s_idx (%s) TYPE %s GRANULARITY %d", r.logsDB, r.logsTable, field.Name, field.Name, *field.IndexType, *field.IndexGranularity)
query := fmt.Sprintf("ALTER TABLE %s.%s ADD INDEX IF NOT EXISTS %s_idx (%s) TYPE %s GRANULARITY %d", r.logsDB, r.logsTable, field.Name, field.Name, field.IndexType, field.IndexGranularity)
err := r.db.Exec(ctx, query)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
@ -2897,13 +2895,13 @@ func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailC
query := fmt.Sprintf("%s from %s.%s", constants.LogsSQLSelect, r.logsDB, r.logsTable)
tsStart := uint64(time.Now().UnixNano())
if client.Filter.TimestampStart != nil {
tsStart = *client.Filter.TimestampStart
if client.Filter.TimestampStart != 0 {
tsStart = client.Filter.TimestampStart
}
var idStart string
if client.Filter.IdStart != nil {
idStart = *client.Filter.IdStart
if client.Filter.IdStart != "" {
idStart = client.Filter.IdStart
}
for {
@ -2954,14 +2952,9 @@ func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailC
func (r *ClickHouseReader) AggregateLogs(ctx context.Context, params *model.LogsAggregateParams) (*model.GetLogsAggregatesResponse, *model.ApiError) {
logAggregatesDBResponseItems := []model.LogsAggregatesDBResponseItem{}
groupBy := ""
if params.GroupBy != nil {
groupBy = *params.GroupBy
}
function := "toFloat64(count()) as value"
if params.Function != nil {
function = fmt.Sprintf("toFloat64(%s) as value", *params.Function)
if params.Function != "" {
function = fmt.Sprintf("toFloat64(%s) as value", params.Function)
}
fields, apiErr := r.GetLogFields(ctx)
@ -2977,22 +2970,22 @@ func (r *ClickHouseReader) AggregateLogs(ctx context.Context, params *model.Logs
}
query := ""
if groupBy != "" {
if params.GroupBy != "" {
query = fmt.Sprintf("SELECT toInt64(toUnixTimestamp(toStartOfInterval(toDateTime(timestamp/1000000000), INTERVAL %d minute))*1000000000) as time, toString(%s) as groupBy, "+
"%s "+
"FROM %s.%s WHERE timestamp >= '%d' AND timestamp <= '%d' ",
*params.StepSeconds/60, groupBy, function, r.logsDB, r.logsTable, *params.TimestampStart, *params.TimestampEnd)
params.StepSeconds/60, params.GroupBy, function, r.logsDB, r.logsTable, params.TimestampStart, params.TimestampEnd)
} else {
query = fmt.Sprintf("SELECT toInt64(toUnixTimestamp(toStartOfInterval(toDateTime(timestamp/1000000000), INTERVAL %d minute))*1000000000) as time, "+
"%s "+
"FROM %s.%s WHERE timestamp >= '%d' AND timestamp <= '%d' ",
*params.StepSeconds/60, function, r.logsDB, r.logsTable, *params.TimestampStart, *params.TimestampEnd)
params.StepSeconds/60, function, r.logsDB, r.logsTable, params.TimestampStart, params.TimestampEnd)
}
if filterSql != "" {
query += fmt.Sprintf(" AND %s ", filterSql)
}
if groupBy != "" {
query += fmt.Sprintf("GROUP BY time, toString(%s) as groupBy ORDER BY time", groupBy)
if params.GroupBy != "" {
query += fmt.Sprintf("GROUP BY time, toString(%s) as groupBy ORDER BY time", params.GroupBy)
} else {
query += "GROUP BY time ORDER BY time"
}
@ -3009,17 +3002,17 @@ func (r *ClickHouseReader) AggregateLogs(ctx context.Context, params *model.Logs
for i := range logAggregatesDBResponseItems {
if elem, ok := aggregateResponse.Items[int64(logAggregatesDBResponseItems[i].Timestamp)]; ok {
if groupBy != "" && logAggregatesDBResponseItems[i].GroupBy != "" {
if params.GroupBy != "" && logAggregatesDBResponseItems[i].GroupBy != "" {
elem.GroupBy[logAggregatesDBResponseItems[i].GroupBy] = logAggregatesDBResponseItems[i].Value
}
aggregateResponse.Items[logAggregatesDBResponseItems[i].Timestamp] = elem
} else {
if groupBy != "" && logAggregatesDBResponseItems[i].GroupBy != "" {
if params.GroupBy != "" && logAggregatesDBResponseItems[i].GroupBy != "" {
aggregateResponse.Items[logAggregatesDBResponseItems[i].Timestamp] = model.LogsAggregatesResponseItem{
Timestamp: logAggregatesDBResponseItems[i].Timestamp,
GroupBy: map[string]interface{}{logAggregatesDBResponseItems[i].GroupBy: logAggregatesDBResponseItems[i].Value},
}
} else if groupBy == "" {
} else if params.GroupBy == "" {
aggregateResponse.Items[logAggregatesDBResponseItems[i].Timestamp] = model.LogsAggregatesResponseItem{
Timestamp: logAggregatesDBResponseItems[i].Timestamp,
Value: logAggregatesDBResponseItems[i].Value,

View File

@ -22,7 +22,13 @@ var operatorMapping = map[string]string{
}
const (
AND = "and"
AND = "and"
ORDER = "order"
ORDER_BY = "orderBy"
TIMESTAMP_START = "timestampStart"
TIMESTAMP_END = "timestampEnd"
IDSTART = "idStart"
IDEND = "idEnd"
)
var tokenRegex, _ = regexp.Compile(`(?i)(and( )*?)?(([\w.-]+ (in|nin) \([\S ]+\))|([\w.]+ (gt|lt|gte|lte) (')?[\S]+(')?)|([\w.]+ (contains|ncontains)) (')?[\S ]+(')?)`)
@ -42,36 +48,34 @@ func ParseLogFilterParams(r *http.Request) (*model.LogsFilterParams, error) {
return nil, err
}
}
if val, ok := params["orderBy"]; ok {
if val, ok := params[ORDER_BY]; ok {
res.OrderBy = val[0]
}
if val, ok := params["order"]; ok {
if val, ok := params[ORDER]; ok {
res.Order = val[0]
}
if val, ok := params["q"]; ok {
res.Query = &val[0]
res.Query = val[0]
}
if val, ok := params["timestampStart"]; ok {
if val, ok := params[TIMESTAMP_START]; ok {
ts, err := strconv.Atoi(val[0])
if err != nil {
return nil, err
}
ts64 := uint64(ts)
res.TimestampStart = &ts64
res.TimestampStart = uint64(ts)
}
if val, ok := params["timestampEnd"]; ok {
if val, ok := params[TIMESTAMP_END]; ok {
ts, err := strconv.Atoi(val[0])
if err != nil {
return nil, err
}
ts64 := uint64(ts)
res.TimestampEnd = &ts64
res.TimestampEnd = uint64(ts)
}
if val, ok := params["idStart"]; ok {
res.IdStart = &val[0]
if val, ok := params[IDSTART]; ok {
res.IdStart = val[0]
}
if val, ok := params["idEnd"]; ok {
res.IdEnd = &val[0]
if val, ok := params[IDEND]; ok {
res.IdEnd = val[0]
}
return &res, nil
}
@ -80,18 +84,17 @@ func ParseLiveTailFilterParams(r *http.Request) (*model.LogsFilterParams, error)
res := model.LogsFilterParams{}
params := r.URL.Query()
if val, ok := params["q"]; ok {
res.Query = &val[0]
res.Query = val[0]
}
if val, ok := params["timestampStart"]; ok {
if val, ok := params[TIMESTAMP_START]; ok {
ts, err := strconv.Atoi(val[0])
if err != nil {
return nil, err
}
ts64 := uint64(ts)
res.TimestampStart = &ts64
res.TimestampStart = uint64(ts)
}
if val, ok := params["idStart"]; ok {
res.IdStart = &val[0]
if val, ok := params[IDSTART]; ok {
res.IdStart = val[0]
}
return &res, nil
}
@ -99,37 +102,35 @@ func ParseLiveTailFilterParams(r *http.Request) (*model.LogsFilterParams, error)
func ParseLogAggregateParams(r *http.Request) (*model.LogsAggregateParams, error) {
res := model.LogsAggregateParams{}
params := r.URL.Query()
if val, ok := params["timestampStart"]; ok {
if val, ok := params[TIMESTAMP_START]; ok {
ts, err := strconv.Atoi(val[0])
if err != nil {
return nil, err
}
ts64 := uint64(ts)
res.TimestampStart = &ts64
res.TimestampStart = uint64(ts)
} else {
return nil, fmt.Errorf("timestampStart is required")
}
if val, ok := params["timestampEnd"]; ok {
if val, ok := params[TIMESTAMP_END]; ok {
ts, err := strconv.Atoi(val[0])
if err != nil {
return nil, err
}
ts64 := uint64(ts)
res.TimestampEnd = &ts64
res.TimestampEnd = uint64(ts)
} else {
return nil, fmt.Errorf("timestampEnd is required")
}
if val, ok := params["q"]; ok {
res.Query = &val[0]
res.Query = val[0]
}
if val, ok := params["groupBy"]; ok {
res.GroupBy = &val[0]
res.GroupBy = val[0]
}
if val, ok := params["function"]; ok {
res.Function = &val[0]
res.Function = val[0]
}
if val, ok := params["step"]; ok {
@ -137,7 +138,7 @@ func ParseLogAggregateParams(r *http.Request) (*model.LogsAggregateParams, error
if err != nil {
return nil, err
}
res.StepSeconds = &step
res.StepSeconds = step
} else {
return nil, fmt.Errorf("step is required")
}
@ -246,8 +247,8 @@ func GenerateSQLWhere(allFields *model.GetFieldsResponse, params *model.LogsFilt
var tokens []string
var err error
var sqlWhere string
if params.Query != nil {
tokens, err = parseLogQuery(*params.Query)
if params.Query != "" {
tokens, err = parseLogQuery(params.Query)
if err != nil {
return sqlWhere, err
}
@ -258,29 +259,29 @@ func GenerateSQLWhere(allFields *model.GetFieldsResponse, params *model.LogsFilt
return sqlWhere, err
}
if params.TimestampStart != nil {
filter := fmt.Sprintf("timestamp >= '%d' ", *params.TimestampStart)
if params.TimestampStart != 0 {
filter := fmt.Sprintf("timestamp >= '%d' ", params.TimestampStart)
if len(tokens) > 0 {
filter = "and " + filter
}
tokens = append(tokens, filter)
}
if params.TimestampEnd != nil {
filter := fmt.Sprintf("timestamp <= '%d' ", *params.TimestampEnd)
if params.TimestampEnd != 0 {
filter := fmt.Sprintf("timestamp <= '%d' ", params.TimestampEnd)
if len(tokens) > 0 {
filter = "and " + filter
}
tokens = append(tokens, filter)
}
if params.IdStart != nil {
filter := fmt.Sprintf("id > '%v' ", *params.IdStart)
if params.IdStart != "" {
filter := fmt.Sprintf("id > '%v' ", params.IdStart)
if len(tokens) > 0 {
filter = "and " + filter
}
tokens = append(tokens, filter)
}
if params.IdEnd != nil {
filter := fmt.Sprintf("id < '%v' ", *params.IdEnd)
if params.IdEnd != "" {
filter := fmt.Sprintf("id < '%v' ", params.IdEnd)
if len(tokens) > 0 {
filter = "and " + filter
}

View File

@ -213,7 +213,7 @@ func TestGenerateSQLQuery(t *testing.T) {
idEnd := "2BsKG6tRpFWjYMcWsAGKfSxoQdU"
sqlWhere := "id < 100 and id > 50 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 and timestamp >= '1657689292000' and timestamp <= '1657689294000' and id > '2BsKLKv8cZrLCn6rkOcRGkdjBdM' and id < '2BsKG6tRpFWjYMcWsAGKfSxoQdU' "
Convey("testInterestingFields", t, func() {
res, _ := GenerateSQLWhere(&allFields, &model.LogsFilterParams{Query: &query, TimestampStart: &tsStart, TimestampEnd: &tsEnd, IdStart: &idStart, IdEnd: &idEnd})
res, _ := GenerateSQLWhere(&allFields, &model.LogsFilterParams{Query: query, TimestampStart: tsStart, TimestampEnd: tsEnd, IdStart: idStart, IdEnd: idEnd})
So(res, ShouldEqual, sqlWhere)
})
}

View File

@ -27,13 +27,13 @@ func ValidateUpdateFieldPayload(field *model.UpdateField) error {
return fmt.Errorf("type %s not supported", field.Type)
}
if field.IndexType != nil {
matched, err := regexp.MatchString(`^(minmax|set\([0-9]\)|bloom_filter\((0?.?[0-9]+|1)\)|tokenbf_v1\([0-9]+,[0-9]+,[0-9]+\)|ngrambf_v1\([0-9]+,[0-9]+,[0-9]+,[0-9]+\))$`, *field.IndexType)
if field.IndexType != "" {
matched, err := regexp.MatchString(`^(minmax|set\([0-9]\)|bloom_filter\((0?.?[0-9]+|1)\)|tokenbf_v1\([0-9]+,[0-9]+,[0-9]+\)|ngrambf_v1\([0-9]+,[0-9]+,[0-9]+,[0-9]+\))$`, field.IndexType)
if err != nil {
return err
}
if !matched {
return fmt.Errorf("index type %s not supported", *field.IndexType)
return fmt.Errorf("index type %s not supported", field.IndexType)
}
}
return nil

View File

@ -322,30 +322,30 @@ type FilterSet struct {
}
type UpdateField struct {
Name string `json:"name"`
DataType string `json:"dataType"`
Type string `json:"type"`
Selected bool `json:"selected"`
IndexType *string `json:"index"`
IndexGranularity *int `json:"indexGranularity"`
Name string `json:"name"`
DataType string `json:"dataType"`
Type string `json:"type"`
Selected bool `json:"selected"`
IndexType string `json:"index"`
IndexGranularity int `json:"indexGranularity"`
}
type LogsFilterParams struct {
Limit int `json:"limit"`
OrderBy string `json:"orderBy"`
Order string `json:"order"`
Query *string `json:"q"`
TimestampStart *uint64 `json:"timestampStart"`
TimestampEnd *uint64 `json:"timestampEnd"`
IdStart *string `json:"idStart"`
IdEnd *string `json:"idEnd"`
Limit int `json:"limit"`
OrderBy string `json:"orderBy"`
Order string `json:"order"`
Query string `json:"q"`
TimestampStart uint64 `json:"timestampStart"`
TimestampEnd uint64 `json:"timestampEnd"`
IdStart string `json:"idStart"`
IdEnd string `json:"idEnd"`
}
type LogsAggregateParams struct {
Query *string `json:"q"`
TimestampStart *uint64 `json:"timestampStart"`
TimestampEnd *uint64 `json:"timestampEnd"`
GroupBy *string `json:"groupBy"`
Function *string `json:"function"`
StepSeconds *int `json:"step"`
Query string `json:"q"`
TimestampStart uint64 `json:"timestampStart"`
TimestampEnd uint64 `json:"timestampEnd"`
GroupBy string `json:"groupBy"`
Function string `json:"function"`
StepSeconds int `json:"step"`
}