chore: limit number of top level operations in services list (#4666)

This commit is contained in:
Srikanth Chekuri 2024-03-12 17:22:48 +05:30 committed by GitHub
parent d9b379ae51
commit c6c2b9d809
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 62 additions and 21 deletions

View File

@ -756,33 +756,47 @@ func (r *ClickHouseReader) GetServicesList(ctx context.Context) (*[]string, erro
return &services, nil return &services, nil
} }
func (r *ClickHouseReader) GetTopLevelOperations(ctx context.Context, skipConfig *model.SkipConfig) (*map[string][]string, *model.ApiError) { func (r *ClickHouseReader) GetTopLevelOperations(ctx context.Context, skipConfig *model.SkipConfig, start, end time.Time) (*map[string][]string, *map[string][]string, *model.ApiError) {
start = start.In(time.UTC)
// The `top_level_operations` that have `time` >= start
operations := map[string][]string{} operations := map[string][]string{}
query := fmt.Sprintf(`SELECT DISTINCT name, serviceName FROM %s.%s`, r.TraceDB, r.topLevelOperationsTable) // All top level operations for a service
allOperations := map[string][]string{}
query := fmt.Sprintf(`SELECT DISTINCT name, serviceName, time FROM %s.%s`, r.TraceDB, r.topLevelOperationsTable)
rows, err := r.db.Query(ctx, query) rows, err := r.db.Query(ctx, query)
if err != nil { if err != nil {
zap.S().Error("Error in processing sql query: ", err) zap.S().Error("Error in processing sql query: ", err)
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query")} return nil, nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
} }
defer rows.Close() defer rows.Close()
for rows.Next() { for rows.Next() {
var name, serviceName string var name, serviceName string
if err := rows.Scan(&name, &serviceName); err != nil { var t time.Time
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("Error in reading data")} if err := rows.Scan(&name, &serviceName, &t); err != nil {
return nil, nil, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("error in reading data")}
} }
if _, ok := operations[serviceName]; !ok { if _, ok := operations[serviceName]; !ok {
operations[serviceName] = []string{} operations[serviceName] = []string{}
} }
if _, ok := allOperations[serviceName]; !ok {
allOperations[serviceName] = []string{}
}
if skipConfig.ShouldSkip(serviceName, name) { if skipConfig.ShouldSkip(serviceName, name) {
continue continue
} }
allOperations[serviceName] = append(allOperations[serviceName], name)
// We can't use the `end` because the `top_level_operations` table has the most recent instances of the operations
// We can only use the `start` time to filter the operations
if t.After(start) {
operations[serviceName] = append(operations[serviceName], name) operations[serviceName] = append(operations[serviceName], name)
} }
return &operations, nil }
return &operations, &allOperations, nil
} }
func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams, skipConfig *model.SkipConfig) (*[]model.ServiceItem, *model.ApiError) { func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams, skipConfig *model.SkipConfig) (*[]model.ServiceItem, *model.ApiError) {
@ -791,7 +805,7 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable} return nil, &model.ApiError{Typ: model.ErrorExec, Err: ErrNoIndexTable}
} }
topLevelOps, apiErr := r.GetTopLevelOperations(ctx, skipConfig) topLevelOps, allTopLevelOps, apiErr := r.GetTopLevelOperations(ctx, skipConfig, *queryParams.Start, *queryParams.End)
if apiErr != nil { if apiErr != nil {
return nil, apiErr return nil, apiErr
} }
@ -810,6 +824,22 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
defer func() { <-sem }() defer func() { <-sem }()
var serviceItem model.ServiceItem var serviceItem model.ServiceItem
var numErrors uint64 var numErrors uint64
// Even if the total number of operations within the time range is less and the all
// the top level operations are high, we want to warn to let user know the issue
// with the instrumentation
serviceItem.DataWarning = model.DataWarning{
TopLevelOps: (*allTopLevelOps)[svc],
}
// default max_query_size = 262144
// Let's assume the average size of the item in `ops` is 50 bytes
// We can have 262144/50 = 5242 items in the `ops` array
// Although we have make it as big as 5k, We cap the number of items
// in the `ops` array to 1500
ops = ops[:int(math.Min(1500, float64(len(ops))))]
query := fmt.Sprintf( query := fmt.Sprintf(
`SELECT `SELECT
quantile(0.99)(durationNano) as p99, quantile(0.99)(durationNano) as p99,
@ -858,6 +888,10 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
return return
} }
subQuery, argsSubQuery, errStatus = buildQueryWithTagParams(ctx, tags) subQuery, argsSubQuery, errStatus = buildQueryWithTagParams(ctx, tags)
if errStatus != nil {
zap.S().Error("Error building query with tag params: ", err)
return
}
query += subQuery query += subQuery
args = append(args, argsSubQuery...) args = append(args, argsSubQuery...)
err = r.db.QueryRow(ctx, errorQuery, args...).Scan(&numErrors) err = r.db.QueryRow(ctx, errorQuery, args...).Scan(&numErrors)
@ -884,7 +918,7 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams *model.GetServiceOverviewParams, skipConfig *model.SkipConfig) (*[]model.ServiceOverviewItem, *model.ApiError) { func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams *model.GetServiceOverviewParams, skipConfig *model.SkipConfig) (*[]model.ServiceOverviewItem, *model.ApiError) {
topLevelOps, apiErr := r.GetTopLevelOperations(ctx, skipConfig) topLevelOps, _, apiErr := r.GetTopLevelOperations(ctx, skipConfig, *queryParams.Start, *queryParams.End)
if apiErr != nil { if apiErr != nil {
return nil, apiErr return nil, apiErr
} }
@ -1576,7 +1610,7 @@ func buildQueryWithTagParams(ctx context.Context, tags []model.TagQuery) (string
case model.NotExistsOperator: case model.NotExistsOperator:
subQuery, argsSubQuery = addExistsOperator(item, tagMapType, true) subQuery, argsSubQuery = addExistsOperator(item, tagMapType, true)
default: default:
return "", nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Tag Operator %s not supported", item.GetOperator())} return "", nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("filter operator %s not supported", item.GetOperator())}
} }
query += subQuery query += subQuery
args = append(args, argsSubQuery...) args = append(args, argsSubQuery...)

View File

@ -1556,7 +1556,9 @@ func (aH *APIHandler) getServiceOverview(w http.ResponseWriter, r *http.Request)
func (aH *APIHandler) getServicesTopLevelOps(w http.ResponseWriter, r *http.Request) { func (aH *APIHandler) getServicesTopLevelOps(w http.ResponseWriter, r *http.Request) {
result, apiErr := aH.reader.GetTopLevelOperations(r.Context(), aH.skipConfig) var start, end time.Time
result, _, apiErr := aH.reader.GetTopLevelOperations(r.Context(), aH.skipConfig, start, end)
if apiErr != nil { if apiErr != nil {
RespondError(w, apiErr, nil) RespondError(w, apiErr, nil)
return return

View File

@ -23,7 +23,7 @@ type Reader interface {
GetInstantQueryMetricsResult(ctx context.Context, query *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError) GetInstantQueryMetricsResult(ctx context.Context, query *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError)
GetQueryRangeResult(ctx context.Context, query *model.QueryRangeParams) (*promql.Result, *stats.QueryStats, *model.ApiError) GetQueryRangeResult(ctx context.Context, query *model.QueryRangeParams) (*promql.Result, *stats.QueryStats, *model.ApiError)
GetServiceOverview(ctx context.Context, query *model.GetServiceOverviewParams, skipConfig *model.SkipConfig) (*[]model.ServiceOverviewItem, *model.ApiError) GetServiceOverview(ctx context.Context, query *model.GetServiceOverviewParams, skipConfig *model.SkipConfig) (*[]model.ServiceOverviewItem, *model.ApiError)
GetTopLevelOperations(ctx context.Context, skipConfig *model.SkipConfig) (*map[string][]string, *model.ApiError) GetTopLevelOperations(ctx context.Context, skipConfig *model.SkipConfig, start, end time.Time) (*map[string][]string, *map[string][]string, *model.ApiError)
GetServices(ctx context.Context, query *model.GetServicesParams, skipConfig *model.SkipConfig) (*[]model.ServiceItem, *model.ApiError) GetServices(ctx context.Context, query *model.GetServicesParams, skipConfig *model.SkipConfig) (*[]model.ServiceItem, *model.ApiError)
GetTopOperations(ctx context.Context, query *model.GetTopOperationsParams) (*[]model.TopOperationsItem, *model.ApiError) GetTopOperations(ctx context.Context, query *model.GetTopOperationsParams) (*[]model.TopOperationsItem, *model.ApiError)
GetUsage(ctx context.Context, query *model.GetUsageParams) (*[]model.UsageItem, error) GetUsage(ctx context.Context, query *model.GetUsageParams) (*[]model.UsageItem, error)

View File

@ -171,6 +171,10 @@ type AlertingRuleResponse struct {
// Value float64 `json:"value"` // Value float64 `json:"value"`
} }
type DataWarning struct {
TopLevelOps []string `json:"topLevelOps"`
}
type ServiceItem struct { type ServiceItem struct {
ServiceName string `json:"serviceName" ch:"serviceName"` ServiceName string `json:"serviceName" ch:"serviceName"`
Percentile99 float64 `json:"p99" ch:"p99"` Percentile99 float64 `json:"p99" ch:"p99"`
@ -181,6 +185,7 @@ type ServiceItem struct {
ErrorRate float64 `json:"errorRate" ch:"errorRate"` ErrorRate float64 `json:"errorRate" ch:"errorRate"`
Num4XX uint64 `json:"num4XX" ch:"num4xx"` Num4XX uint64 `json:"num4XX" ch:"num4xx"`
FourXXRate float64 `json:"fourXXRate" ch:"fourXXRate"` FourXXRate float64 `json:"fourXXRate" ch:"fourXXRate"`
DataWarning DataWarning `json:"dataWarning"`
} }
type ServiceErrorItem struct { type ServiceErrorItem struct {
Time time.Time `json:"time" ch:"time"` Time time.Time `json:"time" ch:"time"`