aggregate function added

This commit is contained in:
nityanandagohain 2022-07-20 12:11:03 +05:30
parent 051f640100
commit c24bdfc8cf
6 changed files with 172 additions and 0 deletions

View File

@ -2967,3 +2967,84 @@ func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailC
} }
} }
} }
func (r *ClickHouseReader) AggregateLogs(ctx context.Context, params *model.LogsAggregateParams) (*model.GetLogsAggregatesResponse, *model.ApiError) {
logAggregatesDBResponseItems := &[]model.LogsAggregatesDBResponseItem{}
groupBy := ""
if params.GroupBy != nil {
groupBy = *params.GroupBy
}
function := "toFloat64(count()) as value"
if params.Function != nil {
function = fmt.Sprintf("toFloat64(%s) as value", *params.Function)
}
fields, apiErr := r.GetLogFields(ctx)
if apiErr != nil {
return nil, apiErr
}
filterSql, err := logs.GenerateSQLWhere(fields, &model.LogsFilterParams{
Query: params.Query,
})
if err != nil {
return nil, &model.ApiError{Err: err, Typ: model.ErrorBadData}
}
query := ""
if groupBy != "" {
query = fmt.Sprintf("SELECT toInt64(toUnixTimestamp(toStartOfInterval(toDateTime(timestamp/1000), INTERVAL %d minute))*1000) as time, toString(%s) as groupBy, "+
"%s "+
"FROM %s.%s WHERE timestamp >= '%d' AND timestamp <= '%d' ",
*params.StepSeconds/60, groupBy, function, r.logsDB, r.logsTable, *params.TimestampStart, *params.TimestampEnd)
} else {
query = fmt.Sprintf("SELECT toInt64(toUnixTimestamp(toStartOfInterval(toDateTime(timestamp/1000), INTERVAL %d minute))*1000) as time, "+
"%s "+
"FROM %s.%s WHERE timestamp >= '%d' AND timestamp <= '%d' ",
*params.StepSeconds/60, function, r.logsDB, r.logsTable, *params.TimestampStart, *params.TimestampEnd)
}
if filterSql != nil && *filterSql != "" {
query += fmt.Sprintf(" AND %s ", *filterSql)
}
if groupBy != "" {
query += fmt.Sprintf("GROUP BY time, toString(%s) as groupBy ORDER BY time", groupBy)
} else {
query += "GROUP BY time ORDER BY time"
}
zap.S().Debug(query)
err = r.db.Select(ctx, logAggregatesDBResponseItems, query)
if err != nil {
return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
aggregateResponse := model.GetLogsAggregatesResponse{
Items: make(map[int64]model.LogsAggregatesResponseItem),
}
for i := range *logAggregatesDBResponseItems {
if elem, ok := aggregateResponse.Items[int64((*logAggregatesDBResponseItems)[i].Timestamp)]; ok {
if groupBy != "" && (*logAggregatesDBResponseItems)[i].GroupBy != "" {
elem.GroupBy[(*logAggregatesDBResponseItems)[i].GroupBy] = (*logAggregatesDBResponseItems)[i].Value
}
aggregateResponse.Items[(*logAggregatesDBResponseItems)[i].Timestamp] = elem
} else {
if groupBy != "" && (*logAggregatesDBResponseItems)[i].GroupBy != "" {
aggregateResponse.Items[(*logAggregatesDBResponseItems)[i].Timestamp] = model.LogsAggregatesResponseItem{
Timestamp: (*logAggregatesDBResponseItems)[i].Timestamp,
GroupBy: map[string]interface{}{(*logAggregatesDBResponseItems)[i].GroupBy: (*logAggregatesDBResponseItems)[i].Value},
}
} else if groupBy == "" {
aggregateResponse.Items[(*logAggregatesDBResponseItems)[i].Timestamp] = model.LogsAggregatesResponseItem{
Timestamp: (*logAggregatesDBResponseItems)[i].Timestamp,
Value: (*logAggregatesDBResponseItems)[i].Value,
}
}
}
}
return &aggregateResponse, nil
}

View File

@ -1868,6 +1868,7 @@ func (aH *APIHandler) RegisterLogsRoutes(router *mux.Router) {
subRouter.HandleFunc("/tail", ViewAccess(aH.tailLogs)).Methods(http.MethodGet) subRouter.HandleFunc("/tail", ViewAccess(aH.tailLogs)).Methods(http.MethodGet)
subRouter.HandleFunc("/fields", ViewAccess(aH.logFields)).Methods(http.MethodGet) subRouter.HandleFunc("/fields", ViewAccess(aH.logFields)).Methods(http.MethodGet)
subRouter.HandleFunc("/fields", ViewAccess(aH.logFieldUpdate)).Methods(http.MethodPost) subRouter.HandleFunc("/fields", ViewAccess(aH.logFieldUpdate)).Methods(http.MethodPost)
subRouter.HandleFunc("/aggregate", ViewAccess(aH.logAggregate)).Methods(http.MethodGet)
} }
func (aH *APIHandler) logFields(w http.ResponseWriter, r *http.Request) { func (aH *APIHandler) logFields(w http.ResponseWriter, r *http.Request) {
@ -1960,3 +1961,19 @@ func (aH *APIHandler) tailLogs(w http.ResponseWriter, r *http.Request) {
} }
} }
} }
func (aH *APIHandler) logAggregate(w http.ResponseWriter, r *http.Request) {
params, err := logs.ParseLogAggregateParams(r)
if err != nil {
apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err}
respondError(w, apiErr, "Incorrect params")
return
}
res, apiErr := (*aH.reader).AggregateLogs(r.Context(), params)
if apiErr != nil {
respondError(w, apiErr, "Failed to fetch logs aggregate from the DB")
return
}
aH.writeJSON(w, r, res)
}

View File

@ -92,6 +92,54 @@ func ParseLiveTailFilterParams(r *http.Request) (*model.LogsFilterParams, error)
return &res, nil return &res, nil
} }
func ParseLogAggregateParams(r *http.Request) (*model.LogsAggregateParams, error) {
res := model.LogsAggregateParams{}
params := r.URL.Query()
if val, ok := params["timestampStart"]; ok {
ts, err := strconv.Atoi(val[0])
if err != nil {
return nil, err
}
ts64 := uint64(ts)
res.TimestampStart = &ts64
} else {
return nil, fmt.Errorf("timestampStart is required")
}
if val, ok := params["timestampEnd"]; ok {
ts, err := strconv.Atoi(val[0])
if err != nil {
return nil, err
}
ts64 := uint64(ts)
res.TimestampEnd = &ts64
} else {
return nil, fmt.Errorf("timestampEnd is required")
}
if val, ok := params["q"]; ok {
res.Query = &val[0]
}
if val, ok := params["groupBy"]; ok {
res.GroupBy = &val[0]
}
if val, ok := params["function"]; ok {
res.Function = &val[0]
}
if val, ok := params["step"]; ok {
step, err := strconv.Atoi(val[0])
if err != nil {
return nil, err
}
res.StepSeconds = &step
} else {
return nil, fmt.Errorf("step is required")
}
return &res, nil
}
func parseLogQuery(query string) ([]string, error) { func parseLogQuery(query string) ([]string, error) {
sqlQueryTokens := []string{} sqlQueryTokens := []string{}
filterTokens := tokenRegex.FindAllString(query, -1) filterTokens := tokenRegex.FindAllString(query, -1)

View File

@ -63,6 +63,7 @@ type Reader interface {
UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError
GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.GetLogsResponse, *model.ApiError) GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.GetLogsResponse, *model.ApiError)
TailLogs(ctx context.Context, client *model.LogsTailClient) TailLogs(ctx context.Context, client *model.LogsTailClient)
AggregateLogs(ctx context.Context, params *model.LogsAggregateParams) (*model.GetLogsAggregatesResponse, *model.ApiError)
// Connection needed for rules, not ideal but required // Connection needed for rules, not ideal but required
GetConn() clickhouse.Conn GetConn() clickhouse.Conn

View File

@ -340,3 +340,12 @@ type LogsFilterParams struct {
IdStart *string `json:"idStart"` IdStart *string `json:"idStart"`
IdEnd *string `json:"idEnd"` IdEnd *string `json:"idEnd"`
} }
type LogsAggregateParams struct {
Query *string `json:"q"`
TimestampStart *uint64 `json:"timestampStart"`
TimestampEnd *uint64 `json:"timestampEnd"`
GroupBy *string `json:"groupBy"`
Function *string `json:"function"`
StepSeconds *int `json:"step"`
}

View File

@ -443,3 +443,19 @@ type LogsTailClient struct {
Error chan error Error chan error
Filter LogsFilterParams Filter LogsFilterParams
} }
type GetLogsAggregatesResponse struct {
Items map[int64]LogsAggregatesResponseItem `json:"items"`
}
type LogsAggregatesResponseItem struct {
Timestamp int64 `json:"timestamp,omitempty" `
Value interface{} `json:"value,omitempty"`
GroupBy map[string]interface{} `json:"groupBy,omitempty"`
}
type LogsAggregatesDBResponseItem struct {
Timestamp int64 `ch:"time"`
Value float64 `ch:"value"`
GroupBy string `ch:"groupBy"`
}