mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-11 17:49:02 +08:00
fixes for CH API implementations
This commit is contained in:
parent
51fe634566
commit
bba7344bae
Binary file not shown.
@ -74,41 +74,16 @@ func connect(cfg *namespaceConfig) (*sqlx.DB, error) {
|
||||
return cfg.Connector(cfg)
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) getStrings(ctx context.Context, sql string, args ...interface{}) ([]string, error) {
|
||||
rows, err := r.db.QueryContext(ctx, sql, args...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer rows.Close()
|
||||
|
||||
values := []string{}
|
||||
|
||||
for rows.Next() {
|
||||
var value string
|
||||
if err := rows.Scan(&value); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
values = append(values, value)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return values, nil
|
||||
}
|
||||
|
||||
// GetServices fetches the sorted service list that have not expired
|
||||
func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, error) {
|
||||
|
||||
if r.indexTable == "" {
|
||||
return nil, ErrNoIndexTable
|
||||
}
|
||||
|
||||
var serviceItems []model.ServiceItem
|
||||
serviceItems := []model.ServiceItem{}
|
||||
|
||||
query := fmt.Sprintf("SELECT serviceName, quantile(0.99)(durationNano) as p99, avg(durationNano) as avgDuration, count(*) as numCalls FROM %s WHERE timestamp>='%s' AND timestamp<='%s' AND kind='2' GROUP BY serviceName", r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
|
||||
|
||||
err := r.db.Select(&serviceItems, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
@ -118,8 +93,58 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
if serviceItems == nil {
|
||||
serviceItems = []model.ServiceItem{}
|
||||
////////////////// Below block gets 5xx of services
|
||||
serviceErrorItems := []model.ServiceItem{}
|
||||
|
||||
query = fmt.Sprintf("SELECT serviceName, count(*) as numErrors FROM %s WHERE timestamp>='%s' AND timestamp<='%s' AND kind='2' AND statusCode>=500 GROUP BY serviceName", r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
|
||||
|
||||
err = r.db.Select(&serviceErrorItems, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
m5xx := make(map[string]int)
|
||||
|
||||
for j, _ := range serviceErrorItems {
|
||||
m5xx[serviceErrorItems[j].ServiceName] = serviceErrorItems[j].NumErrors
|
||||
}
|
||||
///////////////////////////////////////////
|
||||
|
||||
////////////////// Below block gets 4xx of services
|
||||
|
||||
service4xxItems := []model.ServiceItem{}
|
||||
|
||||
query = fmt.Sprintf("SELECT serviceName, count(*) as num4xx FROM %s WHERE timestamp>='%s' AND timestamp<='%s' AND kind='2' AND statusCode>=400 AND statusCode<500 GROUP BY serviceName", r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
|
||||
|
||||
err = r.db.Select(&service4xxItems, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
m4xx := make(map[string]int)
|
||||
|
||||
for j, _ := range service4xxItems {
|
||||
m5xx[service4xxItems[j].ServiceName] = service4xxItems[j].Num4XX
|
||||
}
|
||||
|
||||
for i, _ := range serviceItems {
|
||||
if val, ok := m5xx[serviceItems[i].ServiceName]; ok {
|
||||
serviceItems[i].NumErrors = val
|
||||
}
|
||||
if val, ok := m4xx[serviceItems[i].ServiceName]; ok {
|
||||
serviceItems[i].Num4XX = val
|
||||
}
|
||||
serviceItems[i].CallRate = float32(serviceItems[i].NumCalls) / float32(queryParams.Period)
|
||||
serviceItems[i].FourXXRate = float32(serviceItems[i].Num4XX) / float32(queryParams.Period)
|
||||
serviceItems[i].ErrorRate = float32(serviceItems[i].NumErrors) / float32(queryParams.Period)
|
||||
}
|
||||
|
||||
return &serviceItems, nil
|
||||
@ -127,7 +152,7 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G
|
||||
|
||||
func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceOverviewItem, error) {
|
||||
|
||||
var serviceOverviewItems []model.ServiceOverviewItem
|
||||
serviceOverviewItems := []model.ServiceOverviewItem{}
|
||||
|
||||
query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, quantile(0.99)(durationNano) as p99, quantile(0.95)(durationNano) as p95,quantile(0.50)(durationNano) as p50, count(*) as numCalls FROM %s WHERE timestamp>='%s' AND timestamp<='%s' AND kind='2' AND serviceName='%s' GROUP BY time ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10), queryParams.ServiceName)
|
||||
|
||||
@ -140,14 +165,36 @@ func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams *
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
serviceErrorItems := []model.ServiceErrorItem{}
|
||||
|
||||
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, count(*) as numErrors FROM %s WHERE timestamp>='%s' AND timestamp<='%s' AND kind='2' AND serviceName='%s' AND statusCode>=500 GROUP BY time ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10), queryParams.ServiceName)
|
||||
|
||||
err = r.db.Select(&serviceErrorItems, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
m := make(map[int64]int)
|
||||
|
||||
for j, _ := range serviceErrorItems {
|
||||
timeObj, _ := time.Parse(time.RFC3339Nano, serviceErrorItems[j].Time)
|
||||
m[int64(timeObj.UnixNano())] = serviceErrorItems[j].NumErrors
|
||||
}
|
||||
|
||||
for i, _ := range serviceOverviewItems {
|
||||
timeObj, _ := time.Parse(time.RFC3339Nano, serviceOverviewItems[i].Time)
|
||||
serviceOverviewItems[i].Timestamp = int64(timeObj.UnixNano())
|
||||
serviceOverviewItems[i].Time = ""
|
||||
}
|
||||
|
||||
if serviceOverviewItems == nil {
|
||||
serviceOverviewItems = []model.ServiceOverviewItem{}
|
||||
if val, ok := m[serviceOverviewItems[i].Timestamp]; ok {
|
||||
serviceOverviewItems[i].NumErrors = val
|
||||
}
|
||||
serviceOverviewItems[i].ErrorRate = float32(serviceOverviewItems[i].NumErrors) * 100 / float32(serviceOverviewItems[i].NumCalls)
|
||||
serviceOverviewItems[i].CallRate = float32(serviceOverviewItems[i].NumCalls) / float32(queryParams.StepSeconds)
|
||||
}
|
||||
|
||||
return &serviceOverviewItems, nil
|
||||
@ -189,6 +236,11 @@ func (r *ClickHouseReader) SearchSpans(ctx context.Context, queryParams *model.S
|
||||
|
||||
for _, item := range queryParams.Tags {
|
||||
|
||||
if item.Key == "error" && item.Value == "true" {
|
||||
query = query + " AND ( has(tags, 'error:true') OR statusCode>=500)"
|
||||
continue
|
||||
}
|
||||
|
||||
if item.Operator == "equals" {
|
||||
query = query + " AND has(tags, ?)"
|
||||
args = append(args, fmt.Sprintf("%s:%s", item.Key, item.Value))
|
||||
@ -204,13 +256,10 @@ func (r *ClickHouseReader) SearchSpans(ctx context.Context, queryParams *model.S
|
||||
return nil, fmt.Errorf("Tag Operator %s not supported", item.Operator)
|
||||
}
|
||||
|
||||
if item.Key == "error" && item.Value == "true" {
|
||||
query = query + " OR statusCode>=500"
|
||||
}
|
||||
query = query + " ORDER BY timestamp LIMIT 100"
|
||||
|
||||
}
|
||||
|
||||
query = query + " ORDER BY timestamp DESC LIMIT 100"
|
||||
|
||||
var searchScanReponses []model.SearchSpanReponseItem
|
||||
|
||||
err := r.db.Select(&searchScanReponses, query, args...)
|
||||
|
@ -469,7 +469,7 @@ func GetServices(client *SqlClient, query *model.GetServicesParams) (*[]model.Se
|
||||
|
||||
////////////////// Below block gets 4xx of services
|
||||
|
||||
sqlQuery = fmt.Sprintf(`SELECT COUNT(SpanId) as numErrors, "ServiceName" as "serviceName" FROM %s WHERE "__time" >= '%s' and "__time" <= '%s' and "Kind"='2' and "StatusCode">=400 and "StatusCode" < 500 GROUP BY "ServiceName"`, constants.DruidDatasource, query.StartTime, query.EndTime)
|
||||
sqlQuery = fmt.Sprintf(`SELECT COUNT(SpanId) as num4xx, "ServiceName" as "serviceName" FROM %s WHERE "__time" >= '%s' and "__time" <= '%s' and "Kind"='2' and "StatusCode">=400 and "StatusCode" < 500 GROUP BY "ServiceName"`, constants.DruidDatasource, query.StartTime, query.EndTime)
|
||||
|
||||
response4xx, err := client.Query(sqlQuery, "object")
|
||||
|
||||
|
@ -15,7 +15,7 @@ type ServiceItem struct {
|
||||
CallRate float32 `json:"callRate" db:"callRate"`
|
||||
NumErrors int `json:"numErrors" db:"numErrors"`
|
||||
ErrorRate float32 `json:"errorRate" db:"errorRate"`
|
||||
Num4XX int `json:"num4XX" db:"num4XX"`
|
||||
Num4XX int `json:"num4XX" db:"num4xx"`
|
||||
FourXXRate float32 `json:"fourXXRate" db:"fourXXRate"`
|
||||
}
|
||||
|
||||
@ -26,9 +26,9 @@ type ServiceListErrorItem struct {
|
||||
}
|
||||
|
||||
type ServiceErrorItem struct {
|
||||
Time string `json:"time,omitempty"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
NumErrors int `json:"numErrors"`
|
||||
Time string `json:"time,omitempty" db:"time,omitempty"`
|
||||
Timestamp int64 `json:"timestamp" db:"timestamp"`
|
||||
NumErrors int `json:"numErrors" db:"numErrors"`
|
||||
}
|
||||
|
||||
type ServiceOverviewItem struct {
|
||||
|
Loading…
x
Reference in New Issue
Block a user