clickhouse implementation WIP

This commit is contained in:
Ankit Anand 2021-05-30 11:14:55 +05:30
parent 762a3cdfcd
commit 6aed23ce66
6 changed files with 156 additions and 54 deletions

Binary file not shown.

View File

@ -242,11 +242,11 @@ func (r *ClickHouseReader) SearchSpans(ctx context.Context, queryParams *model.S
func (r *ClickHouseReader) GetServiceDBOverview(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceDBOverviewItem, error) {
var serviceDBOverviewItem []model.ServiceDBOverviewItem
var serviceDBOverviewItems []model.ServiceDBOverviewItem
query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, avg(durationNano) as avgDuration, count(1) as numCalls, dbSystem FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' AND kind='3' AND dbName IS NOT NULL GROUP BY time, dbSystem ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
err := r.db.Select(&serviceDBOverviewItem, query)
err := r.db.Select(&serviceDBOverviewItems, query)
zap.S().Info(query)
@ -255,17 +255,104 @@ func (r *ClickHouseReader) GetServiceDBOverview(ctx context.Context, queryParams
return nil, fmt.Errorf("Error in processing sql query")
}
for i, _ := range serviceDBOverviewItem {
timeObj, _ := time.Parse(time.RFC3339Nano, serviceDBOverviewItem[i].Time)
serviceDBOverviewItem[i].Timestamp = int64(timeObj.UnixNano())
serviceDBOverviewItem[i].Time = ""
serviceDBOverviewItem[i].CallRate = float32(serviceDBOverviewItem[i].NumCalls) / float32(queryParams.StepSeconds)
for i, _ := range serviceDBOverviewItems {
timeObj, _ := time.Parse(time.RFC3339Nano, serviceDBOverviewItems[i].Time)
serviceDBOverviewItems[i].Timestamp = int64(timeObj.UnixNano())
serviceDBOverviewItems[i].Time = ""
serviceDBOverviewItems[i].CallRate = float32(serviceDBOverviewItems[i].NumCalls) / float32(queryParams.StepSeconds)
}
if serviceDBOverviewItem == nil {
serviceDBOverviewItem = []model.ServiceDBOverviewItem{}
if serviceDBOverviewItems == nil {
serviceDBOverviewItems = []model.ServiceDBOverviewItem{}
}
return &serviceDBOverviewItem, nil
return &serviceDBOverviewItems, nil
}
func (r *ClickHouseReader) GetServiceExternalAvgDuration(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) {
var serviceExternalItems []model.ServiceExternalItem
query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, avg(durationNano) as avgDuration FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' AND kind='3' AND externalHttpUrl IS NOT NULL GROUP BY time ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
err := r.db.Select(&serviceExternalItems, query)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, fmt.Errorf("Error in processing sql query")
}
for i, _ := range serviceExternalItems {
timeObj, _ := time.Parse(time.RFC3339Nano, serviceExternalItems[i].Time)
serviceExternalItems[i].Timestamp = int64(timeObj.UnixNano())
serviceExternalItems[i].Time = ""
serviceExternalItems[i].CallRate = float32(serviceExternalItems[i].NumCalls) / float32(queryParams.StepSeconds)
}
if serviceExternalItems == nil {
serviceExternalItems = []model.ServiceExternalItem{}
}
return &serviceExternalItems, nil
}
func (r *ClickHouseReader) GetServiceExternalErrors(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) {
var serviceExternalItems []model.ServiceExternalItem
query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, avg(durationNano) as avgDuration, count(1) as numCalls, externalHttpUrl FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' AND kind='3' AND externalHttpUrl IS NOT NULL AND StatusCode >= 500 GROUP BY time, externalHttpUrl ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
err := r.db.Select(&serviceExternalItems, query)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, fmt.Errorf("Error in processing sql query")
}
for i, _ := range serviceExternalItems {
timeObj, _ := time.Parse(time.RFC3339Nano, serviceExternalItems[i].Time)
serviceExternalItems[i].Timestamp = int64(timeObj.UnixNano())
serviceExternalItems[i].Time = ""
serviceExternalItems[i].CallRate = float32(serviceExternalItems[i].NumCalls) / float32(queryParams.StepSeconds)
}
if serviceExternalItems == nil {
serviceExternalItems = []model.ServiceExternalItem{}
}
return &serviceExternalItems, nil
}
func (r *ClickHouseReader) GetServiceExternal(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) {
var serviceExternalItems []model.ServiceExternalItem
query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, avg(durationNano) as avgDuration, count(1) as numCalls, externalHttpUrl FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' AND kind='3' AND externalHttpUrl IS NOT NULL GROUP BY time, externalHttpUrl ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
err := r.db.Select(&serviceExternalItems, query)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, fmt.Errorf("Error in processing sql query")
}
for i, _ := range serviceExternalItems {
timeObj, _ := time.Parse(time.RFC3339Nano, serviceExternalItems[i].Time)
serviceExternalItems[i].Timestamp = int64(timeObj.UnixNano())
serviceExternalItems[i].Time = ""
serviceExternalItems[i].CallRate = float32(serviceExternalItems[i].NumCalls) / float32(queryParams.StepSeconds)
}
if serviceExternalItems == nil {
serviceExternalItems = []model.ServiceExternalItem{}
}
return &serviceExternalItems, nil
}

View File

@ -54,3 +54,15 @@ func (druid *DruidReader) SearchSpans(ctx context.Context, query *model.SpanSear
func (druid *DruidReader) GetServiceDBOverview(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceDBOverviewItem, error) {
return druidQuery.GetServiceDBOverview(druid.SqlClient, query)
}
func (druid *DruidReader) GetServiceExternalAvgDuration(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) {
return druidQuery.GetServiceExternalAvgDuration(druid.SqlClient, query)
}
func (druid *DruidReader) GetServiceExternalErrors(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) {
return druidQuery.GetServiceExternalErrors(druid.SqlClient, query)
}
func (druid *DruidReader) GetServiceExternal(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) {
return druidQuery.GetServiceExternal(druid.SqlClient, query)
}

View File

@ -61,9 +61,9 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) {
// router.HandleFunc("/api/v1/services/list", aH.getServicesList).Methods(http.MethodGet)
router.HandleFunc("/api/v1/service/overview", aH.getServiceOverview).Methods(http.MethodGet)
router.HandleFunc("/api/v1/service/dbOverview", aH.getServiceDBOverview).Methods(http.MethodGet)
// router.HandleFunc("/api/v1/service/externalAvgDuration", aH.GetServiceExternalAvgDuration).Methods(http.MethodGet)
// router.HandleFunc("/api/v1/service/externalErrors", aH.getServiceExternalErrors).Methods(http.MethodGet)
// router.HandleFunc("/api/v1/service/external", aH.getServiceExternal).Methods(http.MethodGet)
router.HandleFunc("/api/v1/service/externalAvgDuration", aH.GetServiceExternalAvgDuration).Methods(http.MethodGet)
router.HandleFunc("/api/v1/service/externalErrors", aH.getServiceExternalErrors).Methods(http.MethodGet)
router.HandleFunc("/api/v1/service/external", aH.getServiceExternal).Methods(http.MethodGet)
// router.HandleFunc("/api/v1/service/{service}/operations", aH.getOperations).Methods(http.MethodGet)
// router.HandleFunc("/api/v1/service/top_endpoints", aH.getTopEndpoints).Methods(http.MethodGet)
router.HandleFunc("/api/v1/spans", aH.searchSpans).Methods(http.MethodGet)
@ -194,53 +194,53 @@ func (aH *APIHandler) getServiceDBOverview(w http.ResponseWriter, r *http.Reques
}
// func (aH *APIHandler) getServiceExternal(w http.ResponseWriter, r *http.Request) {
func (aH *APIHandler) getServiceExternal(w http.ResponseWriter, r *http.Request) {
// query, err := parseGetServiceExternalRequest(r)
// if aH.handleError(w, err, http.StatusBadRequest) {
// return
// }
query, err := parseGetServiceExternalRequest(r)
if aH.handleError(w, err, http.StatusBadRequest) {
return
}
// result, err := druidQuery.GetServiceExternal(aH.sqlClient, query)
// if aH.handleError(w, err, http.StatusBadRequest) {
// return
// }
result, err := (*aH.reader).GetServiceExternal(context.Background(), query)
if aH.handleError(w, err, http.StatusBadRequest) {
return
}
// aH.writeJSON(w, r, result)
aH.writeJSON(w, r, result)
// }
}
// func (aH *APIHandler) GetServiceExternalAvgDuration(w http.ResponseWriter, r *http.Request) {
func (aH *APIHandler) GetServiceExternalAvgDuration(w http.ResponseWriter, r *http.Request) {
// query, err := parseGetServiceExternalRequest(r)
// if aH.handleError(w, err, http.StatusBadRequest) {
// return
// }
query, err := parseGetServiceExternalRequest(r)
if aH.handleError(w, err, http.StatusBadRequest) {
return
}
// result, err := druidQuery.GetServiceExternalAvgDuration(aH.sqlClient, query)
// if aH.handleError(w, err, http.StatusBadRequest) {
// return
// }
result, err := (*aH.reader).GetServiceExternalAvgDuration(context.Background(), query)
if aH.handleError(w, err, http.StatusBadRequest) {
return
}
// aH.writeJSON(w, r, result)
aH.writeJSON(w, r, result)
// }
}
// func (aH *APIHandler) getServiceExternalErrors(w http.ResponseWriter, r *http.Request) {
func (aH *APIHandler) getServiceExternalErrors(w http.ResponseWriter, r *http.Request) {
// query, err := parseGetServiceExternalRequest(r)
// if aH.handleError(w, err, http.StatusBadRequest) {
// return
// }
query, err := parseGetServiceExternalRequest(r)
if aH.handleError(w, err, http.StatusBadRequest) {
return
}
// result, err := druidQuery.GetServiceExternalErrors(aH.sqlClient, query)
// if aH.handleError(w, err, http.StatusBadRequest) {
// return
// }
result, err := (*aH.reader).GetServiceExternalErrors(context.Background(), query)
if aH.handleError(w, err, http.StatusBadRequest) {
return
}
// aH.writeJSON(w, r, result)
aH.writeJSON(w, r, result)
// }
}
func (aH *APIHandler) getServiceOverview(w http.ResponseWriter, r *http.Request) {

View File

@ -12,4 +12,7 @@ type Reader interface {
// GetApplicationPercentiles(ctx context.Context, query *model.ApplicationPercentileParams) ([]godruid.Timeseries, error)
SearchSpans(ctx context.Context, query *model.SpanSearchParams) (*[]model.SearchSpansResult, error)
GetServiceDBOverview(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceDBOverviewItem, error)
GetServiceExternalAvgDuration(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error)
GetServiceExternalErrors(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error)
GetServiceExternal(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error)
}

View File

@ -67,14 +67,14 @@ func (item *SearchSpanReponseItem) GetValues() []interface{} {
}
type ServiceExternalItem struct {
Time string `json:"time,omitempty"`
Timestamp int64 `json:"timestamp,omitempty"`
ExternalHttpUrl string `json:"externalHttpUrl,omitempty"`
AvgDuration float32 `json:"avgDuration,omitempty"`
NumCalls int `json:"numCalls,omitempty"`
CallRate float32 `json:"callRate,omitempty"`
NumErrors int `json:"numErrors"`
ErrorRate float32 `json:"errorRate"`
Time string `json:"time,omitempty" db:"time,omitempty"`
Timestamp int64 `json:"timestamp,omitempty" db:"timestamp,omitempty"`
ExternalHttpUrl string `json:"externalHttpUrl,omitempty" db:"externalHttpUrl,omitempty"`
AvgDuration float32 `json:"avgDuration,omitempty" db:"avgDuration,omitempty"`
NumCalls int `json:"numCalls,omitempty" db:"numCalls,omitempty"`
CallRate float32 `json:"callRate,omitempty" db:"callRate,omitempty"`
NumErrors int `json:"numErrors" db:"numErrors"`
ErrorRate float32 `json:"errorRate" db:"errorRate"`
}
type ServiceDBOverviewItem struct {