From fff38b58d28e097d3c3f6aab7dcefaac238fa83d Mon Sep 17 00:00:00 2001 From: Ankit Anand Date: Sat, 29 May 2021 16:32:11 +0530 Subject: [PATCH] span search api working --- .../app/clickhouseReader/reader.go | 127 +++++++++++++++++- pkg/query-service/app/druidReader/reader.go | 4 +- pkg/query-service/app/http_handler.go | 50 +++---- pkg/query-service/app/interface.go | 3 +- pkg/query-service/app/parser.go | 8 ++ pkg/query-service/constants/constants.go | 2 +- pkg/query-service/druidQuery/query.go | 13 +- pkg/query-service/model/queryParams.go | 11 +- pkg/query-service/model/response.go | 48 +++++-- 9 files changed, 223 insertions(+), 43 deletions(-) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index fb9add11f8..f83b8e73f4 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "os" + "strconv" "time" _ "github.com/ClickHouse/clickhouse-go" @@ -107,14 +108,134 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G var serviceItems []model.ServiceItem - query := fmt.Sprintf("SELECT service as serviceName, avg(durationNano) as avgDuration FROM %s GROUP BY service", r.indexTable) - + query := fmt.Sprintf("SELECT serviceName, quantile(0.99)(durationNano) as p99, avg(durationNano) as avgDuration, count(*) as numCalls FROM %s WHERE timestamp>='%s' AND timestamp<='%s' AND kind='2' GROUP BY serviceName", r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)) err := r.db.Select(&serviceItems, query) + zap.S().Info(query) + if err != nil { zap.S().Debug("Error in processing sql query: ", err) - return nil, err + return nil, fmt.Errorf("Error in processing sql query") + } + + if serviceItems == nil { + serviceItems = []model.ServiceItem{} } return &serviceItems, nil } + +func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceOverviewItem, error) { + + var serviceOverviewItems []model.ServiceOverviewItem + + query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, quantile(0.99)(durationNano) as p99, quantile(0.95)(durationNano) as p95,quantile(0.50)(durationNano) as p50, count(*) as numCalls FROM %s WHERE timestamp>='%s' AND timestamp<='%s' AND kind='2' AND serviceName='%s' GROUP BY time ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10), queryParams.ServiceName) + + err := r.db.Select(&serviceOverviewItems, query) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + + for i, _ := range serviceOverviewItems { + timeObj, _ := time.Parse(time.RFC3339Nano, serviceOverviewItems[i].Time) + serviceOverviewItems[i].Timestamp = int64(timeObj.UnixNano()) + serviceOverviewItems[i].Time = "" + } + + if serviceOverviewItems == nil { + serviceOverviewItems = []model.ServiceOverviewItem{} + } + + return &serviceOverviewItems, nil + +} + +func (r *ClickHouseReader) SearchSpans(ctx context.Context, queryParams *model.SpanSearchParams) (*[]model.SearchSpansResult, error) { + + query := fmt.Sprintf("SELECT timestamp, spanID, traceID, serviceName, name, kind, durationNano, tagsKeys, tagsValues FROM %s WHERE timestamp >= ? AND timestamp <= ?", r.indexTable) + + args := []interface{}{strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)} + + if len(queryParams.ServiceName) != 0 { + query = query + " AND serviceName = ?" + args = append(args, queryParams.ServiceName) + } + + if len(queryParams.OperationName) != 0 { + + query = query + " AND name = ?" + args = append(args, queryParams.OperationName) + + } + + if len(queryParams.Kind) != 0 { + query = query + " AND kind = ?" + args = append(args, queryParams.Kind) + + } + + // // zap.S().Debug("MinDuration: ", queryParams.MinDuration) + // var lower string + // var upper string + + if len(queryParams.MinDuration) != 0 { + query = query + " AND durationNano >= ?" + args = append(args, queryParams.MinDuration) + } + if len(queryParams.MaxDuration) != 0 { + query = query + " AND durationNano <= ?" + args = append(args, queryParams.MaxDuration) + } + + for _, item := range queryParams.Tags { + + if item.Operator == "equals" { + query = query + " AND has(tags, ?)" + args = append(args, fmt.Sprintf("%s:%s", item.Key, item.Value)) + + } else if item.Operator == "contains" { + query = query + " AND tagsValues[indexOf(tagsKeys, ?)] ILIKE ?" + args = append(args, item.Key) + args = append(args, fmt.Sprintf("%%%s%%", item.Value)) + } else if item.Operator == "isnotnull" { + query = query + " AND has(tagsKeys, ?)" + args = append(args, item.Key) + } else { + return nil, fmt.Errorf("Tag Operator %s not supported", item.Operator) + } + + if item.Key == "error" && item.Value == "true" { + + } + + } + + var searchScanReponses []model.SearchSpanReponseItem + + err := r.db.Select(&searchScanReponses, query, args...) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + + searchSpansResult := []model.SearchSpansResult{ + model.SearchSpansResult{ + Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues"}, + Events: make([][]interface{}, len(searchScanReponses)), + }, + } + + for i, item := range searchScanReponses { + spanEvents := item.GetValues() + searchSpansResult[0].Events[i] = spanEvents + } + + return &searchSpansResult, nil +} diff --git a/pkg/query-service/app/druidReader/reader.go b/pkg/query-service/app/druidReader/reader.go index f55851b2fd..62043be8f6 100644 --- a/pkg/query-service/app/druidReader/reader.go +++ b/pkg/query-service/app/druidReader/reader.go @@ -47,6 +47,6 @@ func (druid *DruidReader) GetServices(ctx context.Context, query *model.GetServi return druidQuery.GetServices(druid.SqlClient, query) } -func (druid *DruidReader) GetApplicationPercentiles(ctx context.Context, query *model.ApplicationPercentileParams) ([]godruid.Timeseries, error) { - return druidQuery.GetApplicationPercentiles(druid.Client, query) +func (druid *DruidReader) SearchSpans(ctx context.Context, query *model.SpanSearchParams) (*[]model.SearchSpansResult, error) { + return druidQuery.SearchSpans(druid.Client, query) } diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index d5825509b5..084b8d57ac 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -59,14 +59,14 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) { // router.HandleFunc("/api/v1/get_percentiles", aH.getApplicationPercentiles).Methods(http.MethodGet) router.HandleFunc("/api/v1/services", aH.getServices).Methods(http.MethodGet) // router.HandleFunc("/api/v1/services/list", aH.getServicesList).Methods(http.MethodGet) - // router.HandleFunc("/api/v1/service/overview", aH.getServiceOverview).Methods(http.MethodGet) + router.HandleFunc("/api/v1/service/overview", aH.getServiceOverview).Methods(http.MethodGet) // router.HandleFunc("/api/v1/service/dbOverview", aH.getServiceDBOverview).Methods(http.MethodGet) // router.HandleFunc("/api/v1/service/externalAvgDuration", aH.GetServiceExternalAvgDuration).Methods(http.MethodGet) // router.HandleFunc("/api/v1/service/externalErrors", aH.getServiceExternalErrors).Methods(http.MethodGet) // router.HandleFunc("/api/v1/service/external", aH.getServiceExternal).Methods(http.MethodGet) // router.HandleFunc("/api/v1/service/{service}/operations", aH.getOperations).Methods(http.MethodGet) // router.HandleFunc("/api/v1/service/top_endpoints", aH.getTopEndpoints).Methods(http.MethodGet) - // router.HandleFunc("/api/v1/spans", aH.searchSpans).Methods(http.MethodGet) + router.HandleFunc("/api/v1/spans", aH.searchSpans).Methods(http.MethodGet) // router.HandleFunc("/api/v1/spans/aggregates", aH.searchSpansAggregates).Methods(http.MethodGet) // router.HandleFunc("/api/v1/tags", aH.searchTags).Methods(http.MethodGet) // router.HandleFunc("/api/v1/traces/{traceId}", aH.searchTraces).Methods(http.MethodGet) @@ -241,21 +241,21 @@ func (aH *APIHandler) user(w http.ResponseWriter, r *http.Request) { // } -// func (aH *APIHandler) getServiceOverview(w http.ResponseWriter, r *http.Request) { +func (aH *APIHandler) getServiceOverview(w http.ResponseWriter, r *http.Request) { -// query, err := parseGetServiceOverviewRequest(r) -// if aH.handleError(w, err, http.StatusBadRequest) { -// return -// } + query, err := parseGetServiceOverviewRequest(r) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } -// result, err := (*aH.reader).GetServiceOverview(context.Background(), query) -// if aH.handleError(w, err, http.StatusBadRequest) { -// return -// } + result, err := (*aH.reader).GetServiceOverview(context.Background(), query) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } -// aH.writeJSON(w, r, result) + aH.writeJSON(w, r, result) -// } +} func (aH *APIHandler) getServices(w http.ResponseWriter, r *http.Request) { @@ -322,20 +322,22 @@ func (aH *APIHandler) getServices(w http.ResponseWriter, r *http.Request) { // aH.writeJSON(w, r, result) // } -// func (aH *APIHandler) searchSpans(w http.ResponseWriter, r *http.Request) { +func (aH *APIHandler) searchSpans(w http.ResponseWriter, r *http.Request) { -// query, err := parseSpanSearchRequest(r) -// if aH.handleError(w, err, http.StatusBadRequest) { -// return -// } + query, err := parseSpanSearchRequest(r) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } -// result, err := druidQuery.SearchSpans(aH.client, query) -// if aH.handleError(w, err, http.StatusBadRequest) { -// return -// } + // result, err := druidQuery.SearchSpans(aH.client, query) + result, err := (*aH.reader).SearchSpans(context.Background(), query) -// aH.writeJSON(w, r, result) -// } + if aH.handleError(w, err, http.StatusBadRequest) { + return + } + + aH.writeJSON(w, r, result) +} // func (aH *APIHandler) getApplicationPercentiles(w http.ResponseWriter, r *http.Request) { // // vars := mux.Vars(r) diff --git a/pkg/query-service/app/interface.go b/pkg/query-service/app/interface.go index 47f4c74a82..6408422786 100644 --- a/pkg/query-service/app/interface.go +++ b/pkg/query-service/app/interface.go @@ -7,7 +7,8 @@ import ( ) type Reader interface { - // GetServiceOverview(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceOverviewItem, error) + GetServiceOverview(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceOverviewItem, error) GetServices(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceItem, error) // GetApplicationPercentiles(ctx context.Context, query *model.ApplicationPercentileParams) ([]godruid.Timeseries, error) + SearchSpans(ctx context.Context, query *model.SpanSearchParams) (*[]model.SearchSpansResult, error) } diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index 2dd1714ffc..025516a3a4 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -101,7 +101,9 @@ func parseGetServiceExternalRequest(r *http.Request) (*model.GetServiceOverviewP } getServiceOverviewParams := model.GetServiceOverviewParams{ + Start: startTime, StartTime: startTime.Format(time.RFC3339Nano), + End: endTime, EndTime: endTime.Format(time.RFC3339Nano), ServiceName: serviceName, Period: fmt.Sprintf("PT%dM", stepInt/60), @@ -137,7 +139,9 @@ func parseGetServiceOverviewRequest(r *http.Request) (*model.GetServiceOverviewP } getServiceOverviewParams := model.GetServiceOverviewParams{ + Start: startTime, StartTime: startTime.Format(time.RFC3339Nano), + End: endTime, EndTime: endTime.Format(time.RFC3339Nano), ServiceName: serviceName, Period: fmt.Sprintf("PT%dM", stepInt/60), @@ -160,7 +164,9 @@ func parseGetServicesRequest(r *http.Request) (*model.GetServicesParams, error) } getServicesParams := model.GetServicesParams{ + Start: startTime, StartTime: startTime.Format(time.RFC3339Nano), + End: endTime, EndTime: endTime.Format(time.RFC3339Nano), Period: int(endTime.Unix() - startTime.Unix()), } @@ -283,6 +289,8 @@ func parseSpanSearchRequest(r *http.Request) (*model.SpanSearchParams, error) { // fmt.Println(startTimeStr) params := &model.SpanSearchParams{ Intervals: fmt.Sprintf("%s/%s", startTimeStr, endTimeStr), + Start: startTime, + End: endTime, Limit: 100, Order: "descending", } diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index c17581781a..df6ae359ac 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -4,7 +4,7 @@ import ( "os" ) -const HTTPHostPort = "0.0.0.0:8080" +const HTTPHostPort = "0.0.0.0:6060" var DruidClientUrl = os.Getenv("DruidClientUrl") var DruidDatasource = os.Getenv("DruidDatasource") diff --git a/pkg/query-service/druidQuery/query.go b/pkg/query-service/druidQuery/query.go index 7aabf4cef0..3bfd880f3d 100644 --- a/pkg/query-service/druidQuery/query.go +++ b/pkg/query-service/druidQuery/query.go @@ -316,7 +316,7 @@ func SearchSpansAggregate(client *godruid.Client, queryParams *model.SpanSearchA return nil, nil } -func SearchSpans(client *godruid.Client, queryParams *model.SpanSearchParams) ([]godruid.ScanResult, error) { +func SearchSpans(client *godruid.Client, queryParams *model.SpanSearchParams) (*[]model.SearchSpansResult, error) { filter, err := buildFilters(queryParams) @@ -347,7 +347,16 @@ func SearchSpans(client *godruid.Client, queryParams *model.SpanSearchParams) ([ // fmt.Printf("query.QueryResult:\n%v", query.QueryResult) - return query.QueryResult, nil + var searchSpansResult []model.SearchSpansResult + searchSpansResult = make([]model.SearchSpansResult, len(query.QueryResult)) + + searchSpansResult[0].Columns = make([]string, len(query.QueryResult[0].Columns)) + copy(searchSpansResult[0].Columns, query.QueryResult[0].Columns) + + searchSpansResult[0].Events = make([][]interface{}, len(query.QueryResult[0].Events)) + copy(searchSpansResult[0].Events, query.QueryResult[0].Events) + + return &searchSpansResult, nil } func GetApplicationPercentiles(client *godruid.Client, queryParams *model.ApplicationPercentileParams) ([]godruid.Timeseries, error) { diff --git a/pkg/query-service/model/queryParams.go b/pkg/query-service/model/queryParams.go index f0c4f4d8cc..aa22ba4d99 100644 --- a/pkg/query-service/model/queryParams.go +++ b/pkg/query-service/model/queryParams.go @@ -1,6 +1,9 @@ package model -import "fmt" +import ( + "fmt" + "time" +) type GetTopEndpointsParams struct { StartTime string @@ -19,11 +22,15 @@ type GetServicesParams struct { StartTime string EndTime string Period int + Start *time.Time + End *time.Time } type GetServiceOverviewParams struct { StartTime string EndTime string + Start *time.Time + End *time.Time ServiceName string Period string StepSeconds int @@ -67,6 +74,8 @@ type SpanSearchParams struct { OperationName string Kind string Intervals string + Start *time.Time + End *time.Time MinDuration string MaxDuration string Limit int64 diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index 4acb528d7b..fca92f5765 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -1,5 +1,10 @@ package model +import ( + "strconv" + "time" +) + type ServiceItem struct { ServiceName string `json:"serviceName" db:"serviceName"` Percentile99 float32 `json:"p99" db:"p99"` @@ -25,15 +30,40 @@ type ServiceErrorItem struct { } type ServiceOverviewItem struct { - Time string `json:"time,omitempty"` - Timestamp int64 `json:"timestamp"` - Percentile50 float32 `json:"p50"` - Percentile95 float32 `json:"p95"` - Percentile99 float32 `json:"p99"` - NumCalls int `json:"numCalls"` - CallRate float32 `json:"callRate"` - NumErrors int `json:"numErrors"` - ErrorRate float32 `json:"errorRate"` + Time string `json:"time,omitempty" db:"time,omitempty"` + Timestamp int64 `json:"timestamp" db:"timestamp"` + Percentile50 float32 `json:"p50" db:"p50"` + Percentile95 float32 `json:"p95" db:"p95"` + Percentile99 float32 `json:"p99" db:"p99"` + NumCalls int `json:"numCalls" db:"numCalls"` + CallRate float32 `json:"callRate" db:"callRate"` + NumErrors int `json:"numErrors" db:"numErrors"` + ErrorRate float32 `json:"errorRate" db:"errorRate"` +} + +type SearchSpansResult struct { + Columns []string `json:"columns"` + Events [][]interface{} `json:"events"` +} +type SearchSpanReponseItem struct { + Timestamp string `db:"timestamp"` + SpanID string `db:"spanID"` + TraceID string `db:"traceID"` + ServiceName string `db:"serviceName"` + Name string `db:"name"` + Kind int32 `db:"kind"` + DurationNano int64 `db:"durationNano"` + TagsKeys []string `db:"tagsKeys"` + TagsValues []string `db:"tagsValues"` +} + +func (item *SearchSpanReponseItem) GetValues() []interface{} { + + timeObj, _ := time.Parse(time.RFC3339Nano, item.Timestamp) + + returnArray := []interface{}{int64(timeObj.UnixNano() / 1000000), item.SpanID, item.TraceID, item.ServiceName, item.Name, strconv.Itoa(int(item.Kind)), strconv.FormatInt(item.DurationNano, 10), item.TagsKeys, item.TagsValues} + + return returnArray } type ServiceExternalItem struct {