chore: only fetch top level operation from the selected time window (#5404)

This commit is contained in:
Srikanth Chekuri 2024-07-30 02:02:50 +05:30 committed by GitHub
parent 1281330c52
commit 38e694cd36
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 55 additions and 23 deletions

View File

@ -706,21 +706,25 @@ func (r *ClickHouseReader) GetServicesList(ctx context.Context) (*[]string, erro
return &services, nil
}
func (r *ClickHouseReader) GetTopLevelOperations(ctx context.Context, skipConfig *model.SkipConfig, start, end time.Time) (*map[string][]string, *map[string][]string, *model.ApiError) {
func (r *ClickHouseReader) GetTopLevelOperations(ctx context.Context, skipConfig *model.SkipConfig, start, end time.Time, services []string) (*map[string][]string, *model.ApiError) {
start = start.In(time.UTC)
// The `top_level_operations` that have `time` >= start
operations := map[string][]string{}
// All top level operations for a service
allOperations := map[string][]string{}
query := fmt.Sprintf(`SELECT DISTINCT name, serviceName, time FROM %s.%s`, r.TraceDB, r.topLevelOperationsTable)
// We can't use the `end` because the `top_level_operations` table has the most recent instances of the operations
// We can only use the `start` time to filter the operations
query := fmt.Sprintf(`SELECT name, serviceName, max(time) as ts FROM %s.%s WHERE time >= @start`, r.TraceDB, r.topLevelOperationsTable)
if len(services) > 0 {
query += ` AND serviceName IN @services`
}
query += ` GROUP BY name, serviceName ORDER BY ts DESC LIMIT 5000`
rows, err := r.db.Query(ctx, query)
rows, err := r.db.Query(ctx, query, clickhouse.Named("start", start), clickhouse.Named("services", services))
if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return nil, nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
}
defer rows.Close()
@ -728,25 +732,17 @@ func (r *ClickHouseReader) GetTopLevelOperations(ctx context.Context, skipConfig
var name, serviceName string
var t time.Time
if err := rows.Scan(&name, &serviceName, &t); err != nil {
return nil, nil, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error in reading data")}
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error in reading data")}
}
if _, ok := operations[serviceName]; !ok {
operations[serviceName] = []string{}
}
if _, ok := allOperations[serviceName]; !ok {
allOperations[serviceName] = []string{}
operations[serviceName] = []string{"overflow_operation"}
}
if skipConfig.ShouldSkip(serviceName, name) {
continue
}
allOperations[serviceName] = append(allOperations[serviceName], name)
// We can't use the `end` because the `top_level_operations` table has the most recent instances of the operations
// We can only use the `start` time to filter the operations
if t.After(start) {
operations[serviceName] = append(operations[serviceName], name)
}
operations[serviceName] = append(operations[serviceName], name)
}
return &operations, &allOperations, nil
return &operations, nil
}
func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams, skipConfig *model.SkipConfig) (*[]model.ServiceItem, *model.ApiError) {
@ -755,7 +751,7 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable}
}
topLevelOps, allTopLevelOps, apiErr := r.GetTopLevelOperations(ctx, skipConfig, *queryParams.Start, *queryParams.End)
topLevelOps, apiErr := r.GetTopLevelOperations(ctx, skipConfig, *queryParams.Start, *queryParams.End, nil)
if apiErr != nil {
return nil, apiErr
}
@ -779,7 +775,7 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
// the top level operations are high, we want to warn to let user know the issue
// with the instrumentation
serviceItem.DataWarning = model.DataWarning{
TopLevelOps: (*allTopLevelOps)[svc],
TopLevelOps: (*topLevelOps)[svc],
}
// default max_query_size = 262144
@ -868,7 +864,7 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams *model.GetServiceOverviewParams, skipConfig *model.SkipConfig) (*[]model.ServiceOverviewItem, *model.ApiError) {
topLevelOps, _, apiErr := r.GetTopLevelOperations(ctx, skipConfig, *queryParams.Start, *queryParams.End)
topLevelOps, apiErr := r.GetTopLevelOperations(ctx, skipConfig, *queryParams.Start, *queryParams.End, nil)
if apiErr != nil {
return nil, apiErr
}

View File

@ -1347,8 +1347,44 @@ func (aH *APIHandler) getServiceOverview(w http.ResponseWriter, r *http.Request)
func (aH *APIHandler) getServicesTopLevelOps(w http.ResponseWriter, r *http.Request) {
var start, end time.Time
var services []string
result, _, apiErr := aH.reader.GetTopLevelOperations(r.Context(), aH.skipConfig, start, end)
type topLevelOpsParams struct {
Service string `json:"service"`
Start string `json:"start"`
End string `json:"end"`
}
var params topLevelOpsParams
err := json.NewDecoder(r.Body).Decode(&params)
if err != nil {
zap.L().Error("Error in getting req body for get top operations API", zap.Error(err))
}
if params.Service != "" {
services = []string{params.Service}
}
startEpoch := params.Start
if startEpoch != "" {
startEpochInt, err := strconv.ParseInt(startEpoch, 10, 64)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, "Error reading start time")
return
}
start = time.Unix(0, startEpochInt)
}
endEpoch := params.End
if endEpoch != "" {
endEpochInt, err := strconv.ParseInt(endEpoch, 10, 64)
if err != nil {
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, "Error reading end time")
return
}
end = time.Unix(0, endEpochInt)
}
result, apiErr := aH.reader.GetTopLevelOperations(r.Context(), aH.skipConfig, start, end, services)
if apiErr != nil {
RespondError(w, apiErr, nil)
return

View File

@ -23,7 +23,7 @@ type Reader interface {
GetInstantQueryMetricsResult(ctx context.Context, query *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError)
GetQueryRangeResult(ctx context.Context, query *model.QueryRangeParams) (*promql.Result, *stats.QueryStats, *model.ApiError)
GetServiceOverview(ctx context.Context, query *model.GetServiceOverviewParams, skipConfig *model.SkipConfig) (*[]model.ServiceOverviewItem, *model.ApiError)
GetTopLevelOperations(ctx context.Context, skipConfig *model.SkipConfig, start, end time.Time) (*map[string][]string, *map[string][]string, *model.ApiError)
GetTopLevelOperations(ctx context.Context, skipConfig *model.SkipConfig, start, end time.Time, services []string) (*map[string][]string, *model.ApiError)
GetServices(ctx context.Context, query *model.GetServicesParams, skipConfig *model.SkipConfig) (*[]model.ServiceItem, *model.ApiError)
GetTopOperations(ctx context.Context, query *model.GetTopOperationsParams) (*[]model.TopOperationsItem, *model.ApiError)
GetUsage(ctx context.Context, query *model.GetUsageParams) (*[]model.UsageItem, error)