diff --git a/pkg/query-service/__debug_bin b/pkg/query-service/__debug_bin index 9fdc242cb7..d405ae39d1 100755 Binary files a/pkg/query-service/__debug_bin and b/pkg/query-service/__debug_bin differ diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 4964138b33..7ade2a6bf9 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -178,10 +178,6 @@ func (r *ClickHouseReader) SearchSpans(ctx context.Context, queryParams *model.S } - // // zap.S().Debug("MinDuration: ", queryParams.MinDuration) - // var lower string - // var upper string - if len(queryParams.MinDuration) != 0 { query = query + " AND durationNano >= ?" args = append(args, queryParams.MinDuration) @@ -209,8 +205,9 @@ func (r *ClickHouseReader) SearchSpans(ctx context.Context, queryParams *model.S } if item.Key == "error" && item.Value == "true" { - + query = query + " OR statusCode>=500" } + query = query + " ORDER BY timestamp LIMIT 100" } @@ -301,11 +298,11 @@ func (r *ClickHouseReader) GetServiceExternalAvgDuration(ctx context.Context, qu func (r *ClickHouseReader) GetServiceExternalErrors(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) { - var serviceExternalItems []model.ServiceExternalItem + var serviceExternalErrorItems []model.ServiceExternalItem - query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, avg(durationNano) as avgDuration, count(1) as numCalls, externalHttpUrl FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' AND kind='3' AND externalHttpUrl IS NOT NULL AND StatusCode >= 500 GROUP BY time, externalHttpUrl ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)) + query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, avg(durationNano) as avgDuration, count(1) as numCalls, externalHttpUrl FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' AND kind='3' AND externalHttpUrl IS NOT NULL AND statusCode >= 500 GROUP BY time, externalHttpUrl ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)) - err := r.db.Select(&serviceExternalItems, query) + err := r.db.Select(&serviceExternalErrorItems, query) zap.S().Info(query) @@ -313,19 +310,44 @@ func (r *ClickHouseReader) GetServiceExternalErrors(ctx context.Context, queryPa zap.S().Debug("Error in processing sql query: ", err) return nil, fmt.Errorf("Error in processing sql query") } + var serviceExternalTotalItems []model.ServiceExternalItem - for i, _ := range serviceExternalItems { - timeObj, _ := time.Parse(time.RFC3339Nano, serviceExternalItems[i].Time) - serviceExternalItems[i].Timestamp = int64(timeObj.UnixNano()) - serviceExternalItems[i].Time = "" - serviceExternalItems[i].CallRate = float32(serviceExternalItems[i].NumCalls) / float32(queryParams.StepSeconds) + queryTotal := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, avg(durationNano) as avgDuration, count(1) as numCalls, externalHttpUrl FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' AND kind='3' AND externalHttpUrl IS NOT NULL GROUP BY time, externalHttpUrl ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)) + + errTotal := r.db.Select(&serviceExternalTotalItems, queryTotal) + + if errTotal != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") } - if serviceExternalItems == nil { - serviceExternalItems = []model.ServiceExternalItem{} + m := make(map[string]int) + + for j, _ := range serviceExternalErrorItems { + timeObj, _ := time.Parse(time.RFC3339Nano, serviceExternalErrorItems[j].Time) + m[strconv.FormatInt(timeObj.UnixNano(), 10)+"-"+serviceExternalErrorItems[j].ExternalHttpUrl] = serviceExternalErrorItems[j].NumCalls } - return &serviceExternalItems, nil + for i, _ := range serviceExternalTotalItems { + timeObj, _ := time.Parse(time.RFC3339Nano, serviceExternalTotalItems[i].Time) + serviceExternalTotalItems[i].Timestamp = int64(timeObj.UnixNano()) + serviceExternalTotalItems[i].Time = "" + // serviceExternalTotalItems[i].CallRate = float32(serviceExternalTotalItems[i].NumCalls) / float32(queryParams.StepSeconds) + + if val, ok := m[strconv.FormatInt(serviceExternalTotalItems[i].Timestamp, 10)+"-"+serviceExternalTotalItems[i].ExternalHttpUrl]; ok { + serviceExternalTotalItems[i].NumErrors = val + serviceExternalTotalItems[i].ErrorRate = float32(serviceExternalTotalItems[i].NumErrors) * 100 / float32(serviceExternalTotalItems[i].NumCalls) + } + serviceExternalTotalItems[i].CallRate = 0 + serviceExternalTotalItems[i].NumCalls = 0 + + } + + if serviceExternalTotalItems == nil { + serviceExternalTotalItems = []model.ServiceExternalItem{} + } + + return &serviceExternalTotalItems, nil } func (r *ClickHouseReader) GetServiceExternal(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) { @@ -356,3 +378,185 @@ func (r *ClickHouseReader) GetServiceExternal(ctx context.Context, queryParams * return &serviceExternalItems, nil } + +func (r *ClickHouseReader) GetTopEndpoints(ctx context.Context, queryParams *model.GetTopEndpointsParams) (*[]model.TopEndpointsItem, error) { + + var topEndpointsItems []model.TopEndpointsItem + + query := fmt.Sprintf("SELECT quantile(0.5)(durationNano) as p50, quantile(0.95)(durationNano) as p95, quantile(0.99)(durationNano) as p99, COUNT(1) as numCalls, name FROM %s WHERE timestamp >= '%s' AND timestamp <= '%s' AND kind='2' and serviceName='%s' GROUP BY name", r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10), queryParams.ServiceName) + + err := r.db.Select(&topEndpointsItems, query) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + + if topEndpointsItems == nil { + topEndpointsItems = []model.TopEndpointsItem{} + } + + return &topEndpointsItems, nil +} + +func (r *ClickHouseReader) GetUsage(ctx context.Context, queryParams *model.GetUsageParams) (*[]model.UsageItem, error) { + + var usageItems []model.UsageItem + + var query string + if len(queryParams.ServiceName) != 0 { + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d HOUR) as time, count(1) as count FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' GROUP BY time ORDER BY time ASC", queryParams.StepHour, r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)) + } else { + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d HOUR) as time, count(1) as count FROM %s WHERE timestamp>='%s' AND timestamp<='%s' GROUP BY time ORDER BY time ASC", queryParams.StepHour, r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)) + } + + err := r.db.Select(&usageItems, query) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + + for i, _ := range usageItems { + timeObj, _ := time.Parse(time.RFC3339Nano, usageItems[i].Time) + usageItems[i].Timestamp = int64(timeObj.UnixNano()) + usageItems[i].Time = "" + } + + if usageItems == nil { + usageItems = []model.UsageItem{} + } + + return &usageItems, nil +} + +func (r *ClickHouseReader) GetServicesList(ctx context.Context) (*[]string, error) { + + services := []string{} + + query := fmt.Sprintf(`SELECT DISTINCT serviceName FROM %s WHERE toDate(timestamp) > now() - INTERVAL 1 DAY`, r.indexTable) + + err := r.db.Select(&services, query) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + + return &services, nil +} + +func (r *ClickHouseReader) GetTags(ctx context.Context, serviceName string) (*[]model.TagItem, error) { + + tagItems := []model.TagItem{} + + query := fmt.Sprintf(`SELECT DISTINCT arrayJoin(tagsKeys) as tagKeys FROM %s WHERE serviceName='%s' AND toDate(timestamp) > now() - INTERVAL 1 DAY`, r.indexTable, serviceName) + + err := r.db.Select(&tagItems, query) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + + return &tagItems, nil +} + +func (r *ClickHouseReader) GetOperations(ctx context.Context, serviceName string) (*[]string, error) { + + operations := []string{} + + query := fmt.Sprintf(`SELECT DISTINCT(name) FROM %s WHERE serviceName='%s' AND toDate(timestamp) > now() - INTERVAL 1 DAY`, r.indexTable, serviceName) + + err := r.db.Select(&operations, query) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + return &operations, nil +} + +func (r *ClickHouseReader) SearchTraces(ctx context.Context, traceId string) (*[]model.SearchSpansResult, error) { + + var searchScanReponses []model.SearchSpanReponseItem + + query := fmt.Sprintf("SELECT timestamp, spanID, traceID, serviceName, name, kind, durationNano, tagsKeys, tagsValues, references FROM %s WHERE traceID='%s'", r.indexTable, traceId) + + err := r.db.Select(&searchScanReponses, query) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + + searchSpansResult := []model.SearchSpansResult{ + model.SearchSpansResult{ + Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References"}, + Events: make([][]interface{}, len(searchScanReponses)), + }, + } + + for i, item := range searchScanReponses { + spanEvents := item.GetValues() + searchSpansResult[0].Events[i] = spanEvents + } + + return &searchSpansResult, nil + +} +func (r *ClickHouseReader) GetServiceMapDependencies(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) { + serviceMapDependencyItems := []model.ServiceMapDependencyItem{} + + query := fmt.Sprintf(`SELECT spanID, parentSpanID, serviceName FROM %s WHERE timestamp>='%s' AND timestamp<='%s'`, r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)) + + err := r.db.Select(&serviceMapDependencyItems, query) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + + serviceMap := make(map[string]*model.ServiceMapDependencyResponseItem) + + spanId2ServiceNameMap := make(map[string]string) + for i, _ := range serviceMapDependencyItems { + spanId2ServiceNameMap[serviceMapDependencyItems[i].SpanId] = serviceMapDependencyItems[i].ServiceName + } + for i, _ := range serviceMapDependencyItems { + parent2childServiceName := spanId2ServiceNameMap[serviceMapDependencyItems[i].ParentSpanId] + "-" + spanId2ServiceNameMap[serviceMapDependencyItems[i].SpanId] + if _, ok := serviceMap[parent2childServiceName]; !ok { + serviceMap[parent2childServiceName] = &model.ServiceMapDependencyResponseItem{ + Parent: spanId2ServiceNameMap[serviceMapDependencyItems[i].ParentSpanId], + Child: spanId2ServiceNameMap[serviceMapDependencyItems[i].SpanId], + CallCount: 1, + } + } else { + serviceMap[parent2childServiceName].CallCount++ + } + } + + retMe := make([]model.ServiceMapDependencyResponseItem, 0, len(serviceMap)) + for _, dependency := range serviceMap { + if dependency.Parent == "" { + continue + } + retMe = append(retMe, *dependency) + } + + return &retMe, nil +} diff --git a/pkg/query-service/app/druidReader/reader.go b/pkg/query-service/app/druidReader/reader.go index a6d3e2819f..6d5e8d6bb0 100644 --- a/pkg/query-service/app/druidReader/reader.go +++ b/pkg/query-service/app/druidReader/reader.go @@ -66,3 +66,31 @@ func (druid *DruidReader) GetServiceExternalErrors(ctx context.Context, query *m func (druid *DruidReader) GetServiceExternal(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) { return druidQuery.GetServiceExternal(druid.SqlClient, query) } + +func (druid *DruidReader) GetTopEndpoints(ctx context.Context, query *model.GetTopEndpointsParams) (*[]model.TopEndpointsItem, error) { + return druidQuery.GetTopEndpoints(druid.SqlClient, query) +} + +func (druid *DruidReader) GetUsage(ctx context.Context, query *model.GetUsageParams) (*[]model.UsageItem, error) { + return druidQuery.GetUsage(druid.SqlClient, query) +} + +func (druid *DruidReader) GetOperations(ctx context.Context, serviceName string) (*[]string, error) { + return druidQuery.GetOperations(druid.SqlClient, serviceName) +} + +func (druid *DruidReader) GetTags(ctx context.Context, serviceName string) (*[]model.TagItem, error) { + return druidQuery.GetTags(druid.SqlClient, serviceName) +} + +func (druid *DruidReader) GetServicesList(ctx context.Context) (*[]string, error) { + return druidQuery.GetServicesList(druid.SqlClient) +} + +func (druid *DruidReader) SearchTraces(ctx context.Context, traceId string) (*[]model.SearchSpansResult, error) { + return druidQuery.SearchTraces(druid.Client, traceId) +} + +func (druid *DruidReader) GetServiceMapDependencies(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) { + return druidQuery.GetServiceMapDependencies(druid.SqlClient, query) +} diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 7db677191c..27bc583319 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -58,20 +58,20 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) { router.HandleFunc("/api/v1/user", aH.user).Methods(http.MethodPost) // router.HandleFunc("/api/v1/get_percentiles", aH.getApplicationPercentiles).Methods(http.MethodGet) router.HandleFunc("/api/v1/services", aH.getServices).Methods(http.MethodGet) - // router.HandleFunc("/api/v1/services/list", aH.getServicesList).Methods(http.MethodGet) + router.HandleFunc("/api/v1/services/list", aH.getServicesList).Methods(http.MethodGet) router.HandleFunc("/api/v1/service/overview", aH.getServiceOverview).Methods(http.MethodGet) router.HandleFunc("/api/v1/service/dbOverview", aH.getServiceDBOverview).Methods(http.MethodGet) router.HandleFunc("/api/v1/service/externalAvgDuration", aH.GetServiceExternalAvgDuration).Methods(http.MethodGet) router.HandleFunc("/api/v1/service/externalErrors", aH.getServiceExternalErrors).Methods(http.MethodGet) router.HandleFunc("/api/v1/service/external", aH.getServiceExternal).Methods(http.MethodGet) - // router.HandleFunc("/api/v1/service/{service}/operations", aH.getOperations).Methods(http.MethodGet) - // router.HandleFunc("/api/v1/service/top_endpoints", aH.getTopEndpoints).Methods(http.MethodGet) + router.HandleFunc("/api/v1/service/{service}/operations", aH.getOperations).Methods(http.MethodGet) + router.HandleFunc("/api/v1/service/top_endpoints", aH.getTopEndpoints).Methods(http.MethodGet) router.HandleFunc("/api/v1/spans", aH.searchSpans).Methods(http.MethodGet) // router.HandleFunc("/api/v1/spans/aggregates", aH.searchSpansAggregates).Methods(http.MethodGet) - // router.HandleFunc("/api/v1/tags", aH.searchTags).Methods(http.MethodGet) - // router.HandleFunc("/api/v1/traces/{traceId}", aH.searchTraces).Methods(http.MethodGet) - // router.HandleFunc("/api/v1/usage", aH.getUsage).Methods(http.MethodGet) - // router.HandleFunc("/api/v1/serviceMapDependencies", aH.serviceMapDependencies).Methods(http.MethodGet) + router.HandleFunc("/api/v1/tags", aH.searchTags).Methods(http.MethodGet) + router.HandleFunc("/api/v1/traces/{traceId}", aH.searchTraces).Methods(http.MethodGet) + router.HandleFunc("/api/v1/usage", aH.getUsage).Methods(http.MethodGet) + router.HandleFunc("/api/v1/serviceMapDependencies", aH.serviceMapDependencies).Methods(http.MethodGet) } func (aH *APIHandler) user(w http.ResponseWriter, r *http.Request) { @@ -99,83 +99,84 @@ func (aH *APIHandler) user(w http.ResponseWriter, r *http.Request) { } -// func (aH *APIHandler) getOperations(w http.ResponseWriter, r *http.Request) { +func (aH *APIHandler) getOperations(w http.ResponseWriter, r *http.Request) { -// vars := mux.Vars(r) -// serviceName := vars["service"] + vars := mux.Vars(r) + serviceName := vars["service"] -// var err error -// if len(serviceName) == 0 { -// err = fmt.Errorf("service param not found") -// } -// if aH.handleError(w, err, http.StatusBadRequest) { -// return -// } + var err error + if len(serviceName) == 0 { + err = fmt.Errorf("service param not found") + } + if aH.handleError(w, err, http.StatusBadRequest) { + return + } -// result, err := druidQuery.GetOperations(aH.sqlClient, serviceName) -// if aH.handleError(w, err, http.StatusBadRequest) { -// return -// } + result, err := (*aH.reader).GetOperations(context.Background(), serviceName) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } -// aH.writeJSON(w, r, result) + aH.writeJSON(w, r, result) -// } +} -// func (aH *APIHandler) getServicesList(w http.ResponseWriter, r *http.Request) { +func (aH *APIHandler) getServicesList(w http.ResponseWriter, r *http.Request) { -// result, err := druidQuery.GetServicesList(aH.sqlClient) -// if aH.handleError(w, err, http.StatusBadRequest) { -// return -// } + result, err := (*aH.reader).GetServicesList(context.Background()) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } -// aH.writeJSON(w, r, result) + aH.writeJSON(w, r, result) -// } +} -// func (aH *APIHandler) searchTags(w http.ResponseWriter, r *http.Request) { +func (aH *APIHandler) searchTags(w http.ResponseWriter, r *http.Request) { -// serviceName := r.URL.Query().Get("service") + serviceName := r.URL.Query().Get("service") -// result, err := druidQuery.GetTags(aH.sqlClient, serviceName) -// if aH.handleError(w, err, http.StatusBadRequest) { -// return -// } + result, err := (*aH.reader).GetTags(context.Background(), serviceName) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } -// aH.writeJSON(w, r, result) + aH.writeJSON(w, r, result) -// } +} -// func (aH *APIHandler) getTopEndpoints(w http.ResponseWriter, r *http.Request) { +func (aH *APIHandler) getTopEndpoints(w http.ResponseWriter, r *http.Request) { -// query, err := parseGetTopEndpointsRequest(r) -// if aH.handleError(w, err, http.StatusBadRequest) { -// return -// } + query, err := parseGetTopEndpointsRequest(r) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } -// result, err := druidQuery.GetTopEndpoints(aH.sqlClient, query) -// if aH.handleError(w, err, http.StatusBadRequest) { -// return -// } + result, err := (*aH.reader).GetTopEndpoints(context.Background(), query) -// aH.writeJSON(w, r, result) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } -// } + aH.writeJSON(w, r, result) -// func (aH *APIHandler) getUsage(w http.ResponseWriter, r *http.Request) { +} -// query, err := parseGetUsageRequest(r) -// if aH.handleError(w, err, http.StatusBadRequest) { -// return -// } +func (aH *APIHandler) getUsage(w http.ResponseWriter, r *http.Request) { -// result, err := druidQuery.GetUsage(aH.sqlClient, query) -// if aH.handleError(w, err, http.StatusBadRequest) { -// return -// } + query, err := parseGetUsageRequest(r) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } -// aH.writeJSON(w, r, result) + result, err := (*aH.reader).GetUsage(context.Background(), query) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } -// } + aH.writeJSON(w, r, result) + +} func (aH *APIHandler) getServiceDBOverview(w http.ResponseWriter, r *http.Request) { @@ -280,34 +281,35 @@ func (aH *APIHandler) getServices(w http.ResponseWriter, r *http.Request) { aH.writeJSON(w, r, result) } -// func (aH *APIHandler) serviceMapDependencies(w http.ResponseWriter, r *http.Request) { +func (aH *APIHandler) serviceMapDependencies(w http.ResponseWriter, r *http.Request) { -// query, err := parseGetServicesRequest(r) -// if aH.handleError(w, err, http.StatusBadRequest) { -// return -// } + query, err := parseGetServicesRequest(r) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } -// result, err := druidQuery.GetServiceMapDependencies(aH.sqlClient, query) -// if aH.handleError(w, err, http.StatusBadRequest) { -// return -// } + result, err := (*aH.reader).GetServiceMapDependencies(context.Background(), query) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } -// aH.writeJSON(w, r, result) -// } + aH.writeJSON(w, r, result) +} -// func (aH *APIHandler) searchTraces(w http.ResponseWriter, r *http.Request) { +func (aH *APIHandler) searchTraces(w http.ResponseWriter, r *http.Request) { -// vars := mux.Vars(r) -// traceId := vars["traceId"] + vars := mux.Vars(r) + traceId := vars["traceId"] -// result, err := druidQuery.SearchTraces(aH.client, traceId) -// if aH.handleError(w, err, http.StatusBadRequest) { -// return -// } + result, err := (*aH.reader).SearchTraces(context.Background(), traceId) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } -// aH.writeJSON(w, r, result) + aH.writeJSON(w, r, result) + +} -// } // func (aH *APIHandler) searchSpansAggregates(w http.ResponseWriter, r *http.Request) { // query, err := parseSearchSpanAggregatesRequest(r) diff --git a/pkg/query-service/app/interface.go b/pkg/query-service/app/interface.go index b2cb5eb966..107ade68c7 100644 --- a/pkg/query-service/app/interface.go +++ b/pkg/query-service/app/interface.go @@ -15,4 +15,11 @@ type Reader interface { GetServiceExternalAvgDuration(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) GetServiceExternalErrors(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) GetServiceExternal(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) + GetTopEndpoints(ctx context.Context, query *model.GetTopEndpointsParams) (*[]model.TopEndpointsItem, error) + GetUsage(ctx context.Context, query *model.GetUsageParams) (*[]model.UsageItem, error) + GetOperations(ctx context.Context, serviceName string) (*[]string, error) + GetTags(ctx context.Context, serviceName string) (*[]model.TagItem, error) + GetServicesList(ctx context.Context) (*[]string, error) + SearchTraces(ctx context.Context, traceID string) (*[]model.SearchSpansResult, error) + GetServiceMapDependencies(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) } diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index 025516a3a4..a586dc6f8b 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -38,6 +38,8 @@ func parseGetTopEndpointsRequest(r *http.Request) (*model.GetTopEndpointsParams, StartTime: startTime.Format(time.RFC3339Nano), EndTime: endTime.Format(time.RFC3339Nano), ServiceName: serviceName, + Start: startTime, + End: endTime, } return &getTopEndpointsParams, nil @@ -64,12 +66,16 @@ func parseGetUsageRequest(r *http.Request) (*model.GetUsageParams, error) { } serviceName := r.URL.Query().Get("service") + stepHour := stepInt / 3600 getUsageParams := model.GetUsageParams{ StartTime: startTime.Format(time.RFC3339Nano), EndTime: endTime.Format(time.RFC3339Nano), + Start: startTime, + End: endTime, ServiceName: serviceName, - Period: fmt.Sprintf("PT%dH", stepInt/3600), + Period: fmt.Sprintf("PT%dH", stepHour), + StepHour: stepHour, } return &getUsageParams, nil diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index cecc254caf..99c47f5539 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -97,8 +97,10 @@ func createHTTPServer() (*http.Server, error) { storage := os.Getenv("STORAGE") if storage == "druid" { + zap.S().Info("Using Apache Druid as datastore ...") reader = druidReader.NewReader() } else if storage == "clickhouse" { + zap.S().Info("Using ClickHouse as datastore ...") reader = clickhouseReader.NewReader() } else { return nil, fmt.Errorf("Storage type: %s is not supported in query service", storage) diff --git a/pkg/query-service/druidQuery/mysql-query.go b/pkg/query-service/druidQuery/mysql-query.go index 4465a9b51b..f8aa92829e 100644 --- a/pkg/query-service/druidQuery/mysql-query.go +++ b/pkg/query-service/druidQuery/mysql-query.go @@ -101,9 +101,9 @@ func GetTags(client *SqlClient, serviceName string) (*[]model.TagItem, error) { return &tagResponse, nil } -func GetTopEndpoints(client *SqlClient, query *model.GetTopEndpointsParams) (*[]model.TopEnpointsItem, error) { +func GetTopEndpoints(client *SqlClient, query *model.GetTopEndpointsParams) (*[]model.TopEndpointsItem, error) { - sqlQuery := fmt.Sprintf(`SELECT APPROX_QUANTILE_DS("QuantileDuration", 0.5) as p50, APPROX_QUANTILE_DS("QuantileDuration", 0.9) as p90, APPROX_QUANTILE_DS("QuantileDuration", 0.99) as p99, COUNT(SpanId) as numCalls, Name FROM "%s" WHERE "__time" >= '%s' AND "__time" <= '%s' AND "Kind"='2' and "ServiceName"='%s' GROUP BY Name`, constants.DruidDatasource, query.StartTime, query.EndTime, query.ServiceName) + sqlQuery := fmt.Sprintf(`SELECT APPROX_QUANTILE_DS("QuantileDuration", 0.5) as p50, APPROX_QUANTILE_DS("QuantileDuration", 0.95) as p95, APPROX_QUANTILE_DS("QuantileDuration", 0.99) as p99, COUNT(SpanId) as numCalls, Name FROM "%s" WHERE "__time" >= '%s' AND "__time" <= '%s' AND "Kind"='2' and "ServiceName"='%s' GROUP BY Name`, constants.DruidDatasource, query.StartTime, query.EndTime, query.ServiceName) // zap.S().Debug(sqlQuery) @@ -116,7 +116,7 @@ func GetTopEndpoints(client *SqlClient, query *model.GetTopEndpointsParams) (*[] // zap.S().Info(string(response)) - res := new([]model.TopEnpointsItem) + res := new([]model.TopEndpointsItem) err = json.Unmarshal(response, res) if err != nil { zap.S().Error(err) @@ -517,7 +517,7 @@ func GetServices(client *SqlClient, query *model.GetServicesParams) (*[]model.Se func GetServiceMapDependencies(client *SqlClient, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) { - sqlQuery := fmt.Sprintf(`SELECT SpanId, ParentSpanId, ServiceName FROM %s WHERE "__time" >= '%s' AND "__time" <= '%s' ORDER BY __time DESC LIMIT 100000`, constants.DruidDatasource, query.StartTime, query.EndTime) + sqlQuery := fmt.Sprintf(`SELECT SpanId, ParentSpanId, ServiceName FROM %s WHERE "__time" >= '%s' AND "__time" <= '%s' ORDER BY __time DESC`, constants.DruidDatasource, query.StartTime, query.EndTime) // zap.S().Debug(sqlQuery) diff --git a/pkg/query-service/druidQuery/query.go b/pkg/query-service/druidQuery/query.go index 3bfd880f3d..5a2be97943 100644 --- a/pkg/query-service/druidQuery/query.go +++ b/pkg/query-service/druidQuery/query.go @@ -181,7 +181,7 @@ func buildFiltersForSpansAggregates(queryParams *model.SpanSearchAggregatesParam } -func SearchTraces(client *godruid.Client, traceId string) ([]godruid.ScanResult, error) { +func SearchTraces(client *godruid.Client, traceId string) (*[]model.SearchSpansResult, error) { filter := godruid.FilterSelector("TraceId", traceId) @@ -206,7 +206,17 @@ func SearchTraces(client *godruid.Client, traceId string) ([]godruid.ScanResult, // fmt.Printf("query.QueryResult:\n%v", query.QueryResult) - return query.QueryResult, nil + var searchSpansResult []model.SearchSpansResult + searchSpansResult = make([]model.SearchSpansResult, len(query.QueryResult)) + + searchSpansResult[0].Columns = make([]string, len(query.QueryResult[0].Columns)) + copy(searchSpansResult[0].Columns, query.QueryResult[0].Columns) + + searchSpansResult[0].Events = make([][]interface{}, len(query.QueryResult[0].Events)) + copy(searchSpansResult[0].Events, query.QueryResult[0].Events) + + return &searchSpansResult, nil + } func SearchSpansAggregate(client *godruid.Client, queryParams *model.SpanSearchAggregatesParams) ([]SpanSearchAggregatesResponseItem, error) { diff --git a/pkg/query-service/model/queryParams.go b/pkg/query-service/model/queryParams.go index aa22ba4d99..49a0ed5b28 100644 --- a/pkg/query-service/model/queryParams.go +++ b/pkg/query-service/model/queryParams.go @@ -9,6 +9,8 @@ type GetTopEndpointsParams struct { StartTime string EndTime string ServiceName string + Start *time.Time + End *time.Time } type GetUsageParams struct { @@ -16,6 +18,9 @@ type GetUsageParams struct { EndTime string ServiceName string Period string + StepHour int + Start *time.Time + End *time.Time } type GetServicesParams struct { diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index f4147e9765..176d1ac885 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -1,6 +1,8 @@ package model import ( + "encoding/json" + "fmt" "strconv" "time" ) @@ -45,7 +47,18 @@ type SearchSpansResult struct { Columns []string `json:"columns"` Events [][]interface{} `json:"events"` } -type SearchSpanReponseItem struct { + +type TraceResult struct { + Data []interface{} `json:"data" db:"data"` + Total int `json:"total" db:"total"` + Limit int `json:"limit" db:"limit"` + Offset int `json:"offset" db:"offset"` +} +type TraceResultItem struct { + TraceID string + Spans []TraceResultSpan +} +type TraceResultSpan struct { Timestamp string `db:"timestamp"` SpanID string `db:"spanID"` TraceID string `db:"traceID"` @@ -57,11 +70,44 @@ type SearchSpanReponseItem struct { TagsValues []string `db:"tagsValues"` } +type SearchSpanReponseItem struct { + Timestamp string `db:"timestamp"` + SpanID string `db:"spanID"` + TraceID string `db:"traceID"` + ServiceName string `db:"serviceName"` + Name string `db:"name"` + Kind int32 `db:"kind"` + References string `db:"references,omitempty"` + DurationNano int64 `db:"durationNano"` + TagsKeys []string `db:"tagsKeys"` + TagsValues []string `db:"tagsValues"` +} + +type OtelSpanRef struct { + TraceId string `json:"traceId,omitempty"` + SpanId string `json:"spanId,omitempty"` + RefType string `json:"refType,omitempty"` +} + +func (ref *OtelSpanRef) toString() string { + + retString := fmt.Sprintf(`{TraceId=%s, SpanId=%s, RefType=%s}`, ref.TraceId, ref.SpanId, ref.RefType) + + return retString +} + func (item *SearchSpanReponseItem) GetValues() []interface{} { timeObj, _ := time.Parse(time.RFC3339Nano, item.Timestamp) + references := []OtelSpanRef{} + json.Unmarshal([]byte(item.References), &references) - returnArray := []interface{}{int64(timeObj.UnixNano() / 1000000), item.SpanID, item.TraceID, item.ServiceName, item.Name, strconv.Itoa(int(item.Kind)), strconv.FormatInt(item.DurationNano, 10), item.TagsKeys, item.TagsValues} + referencesStringArray := []string{} + for _, item := range references { + referencesStringArray = append(referencesStringArray, item.toString()) + } + + returnArray := []interface{}{int64(timeObj.UnixNano() / 1000000), item.SpanID, item.TraceID, item.ServiceName, item.Name, strconv.Itoa(int(item.Kind)), strconv.FormatInt(item.DurationNano, 10), item.TagsKeys, item.TagsValues, referencesStringArray} return returnArray } @@ -87,32 +133,32 @@ type ServiceDBOverviewItem struct { } type ServiceMapDependencyItem struct { - SpanId string `json:"spanId,omitempty"` - ParentSpanId string `json:"parentSpanId,omitempty"` - ServiceName string `json:"serviceName,omitempty"` + SpanId string `json:"spanId,omitempty" db:"spanID,omitempty"` + ParentSpanId string `json:"parentSpanId,omitempty" db:"parentSpanID,omitempty"` + ServiceName string `json:"serviceName,omitempty" db:"serviceName,omitempty"` } type UsageItem struct { - Time string `json:"time,omitempty"` - Timestamp int64 `json:"timestamp"` - Count int64 `json:"count"` + Time string `json:"time,omitempty" db:"time,omitempty"` + Timestamp int64 `json:"timestamp" db:"timestamp"` + Count int64 `json:"count" db:"count"` } -type TopEnpointsItem struct { - Percentile50 float32 `json:"p50"` - Percentile90 float32 `json:"p90"` - Percentile99 float32 `json:"p99"` - NumCalls int `json:"numCalls"` - Name string `json:"name"` +type TopEndpointsItem struct { + Percentile50 float32 `json:"p50" db:"p50"` + Percentile95 float32 `json:"p95" db:"p95"` + Percentile99 float32 `json:"p99" db:"p99"` + NumCalls int `json:"numCalls" db:"numCalls"` + Name string `json:"name" db:"name"` } type TagItem struct { - TagKeys string `json:"tagKeys"` - TagCount int `json:"tagCount"` + TagKeys string `json:"tagKeys" db:"tagKeys"` + TagCount int `json:"tagCount" db:"tagCount"` } type ServiceMapDependencyResponseItem struct { - Parent string `json:"parent,omitempty"` - Child string `json:"child,omitempty"` - CallCount int `json:"callCount,omitempty"` + Parent string `json:"parent,omitempty" db:"parent,omitempty"` + Child string `json:"child,omitempty" db:"child,omitempty"` + CallCount int `json:"callCount,omitempty" db:"callCount,omitempty"` }