diff --git a/pkg/processors/flattener/main.go b/pkg/processors/flattener/main.go index 493a6bcb17..0e974ba2de 100644 --- a/pkg/processors/flattener/main.go +++ b/pkg/processors/flattener/main.go @@ -42,19 +42,26 @@ var ( ) type Span struct { - TraceId string - SpanId string - ParentSpanId string - Name string - DurationNano uint64 - StartTimeUnixNano uint64 - ServiceName string - Kind int32 - StatusCode int64 - References []OtelSpanRef - Tags []string - TagsKeys []string - TagsValues []string + TraceId string + SpanId string + ParentSpanId string + Name string + DurationNano uint64 + StartTimeUnixNano uint64 + ServiceName string + Kind int32 + References []OtelSpanRef + Tags []string + TagsKeys []string + TagsValues []string + StatusCode int64 + ExternalHttpMethod string + ExternalHttpUrl string + Component string + DBSystem string + DBName string + DBOperation string + PeerService string } type OtelSpanRef struct { @@ -195,6 +202,39 @@ func byteSlice2string(byteSlice []byte) string { return hex.EncodeToString(byteSlice) } +func populateOtherDimensions(attributes pdata.AttributeMap, span *Span) { + + attributes.ForEach(func(k string, v pdata.AttributeValue) { + if k == "http.status_code" { + span.StatusCode = v.IntVal() + } + if k == "http.url" { + span.ExternalHttpUrl = v.StringVal() + } + if k == "http.method" { + span.ExternalHttpMethod = v.StringVal() + } + if k == "component" { + span.Component = v.StringVal() + } + + if k == "db.system" { + span.DBSystem = v.StringVal() + } + if k == "db.name" { + span.DBName = v.StringVal() + } + if k == "db.operation" { + span.DBOperation = v.StringVal() + } + if k == "peer.service" { + span.PeerService = v.StringVal() + } + + }) + +} + func newStructuredSpan(otelSpan pdata.Span, ServiceName string) *Span { durationNano := uint64(otelSpan.EndTime() - otelSpan.StartTime()) @@ -202,8 +242,6 @@ func newStructuredSpan(otelSpan pdata.Span, ServiceName string) *Span { spanID_bytes := otelSpan.SpanID().Bytes() parentSpanID_bytes := otelSpan.ParentSpanID().Bytes() - var statusCode int64 - attributes := otelSpan.Attributes() var tags []string @@ -217,9 +255,6 @@ func newStructuredSpan(otelSpan pdata.Span, ServiceName string) *Span { } else { tag = fmt.Sprintf("%s:%s", k, v.StringVal()) } - if k == "http.status_code" { - statusCode = v.IntVal() - } tags = append(tags, tag) tagsKeys = append(tagsKeys, k) @@ -228,7 +263,7 @@ func newStructuredSpan(otelSpan pdata.Span, ServiceName string) *Span { references, _ := makeJaegerProtoReferences(otelSpan.Links(), otelSpan.ParentSpanID(), otelSpan.TraceID()) - return &Span{ + var span *Span = &Span{ TraceId: hex.EncodeToString(traceID_bytes[:]), SpanId: hex.EncodeToString(spanID_bytes[:]), ParentSpanId: hex.EncodeToString(parentSpanID_bytes[:]), @@ -237,12 +272,15 @@ func newStructuredSpan(otelSpan pdata.Span, ServiceName string) *Span { DurationNano: durationNano, ServiceName: ServiceName, Kind: int32(otelSpan.Kind()), - StatusCode: statusCode, References: references, Tags: tags, TagsKeys: tagsKeys, TagsValues: tagsValues, } + + populateOtherDimensions(attributes, span) + + return span } // ServiceNameForResource gets the service name for a specified Resource. diff --git a/pkg/query-service/__debug_bin b/pkg/query-service/__debug_bin new file mode 100755 index 0000000000..2f81072b62 Binary files /dev/null and b/pkg/query-service/__debug_bin differ diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index b0b6c88d00..b7cf48c85e 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -63,6 +63,9 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) { router.HandleFunc("/api/v1/services", aH.getServices).Methods(http.MethodGet) router.HandleFunc("/api/v1/services/list", aH.getServicesList).Methods(http.MethodGet) router.HandleFunc("/api/v1/service/overview", aH.getServiceOverview).Methods(http.MethodGet) + router.HandleFunc("/api/v1/service/externalAvgDuration", aH.GetServiceExternalAvgDuration).Methods(http.MethodGet) + router.HandleFunc("/api/v1/service/externalErrors", aH.getServiceExternalErrors).Methods(http.MethodGet) + router.HandleFunc("/api/v1/service/external", aH.getServiceExternal).Methods(http.MethodGet) router.HandleFunc("/api/v1/service/{service}/operations", aH.getOperations).Methods(http.MethodGet) router.HandleFunc("/api/v1/service/top_endpoints", aH.getTopEndpoints).Methods(http.MethodGet) router.HandleFunc("/api/v1/spans", aH.searchSpans).Methods(http.MethodGet) @@ -175,6 +178,54 @@ func (aH *APIHandler) getUsage(w http.ResponseWriter, r *http.Request) { } +func (aH *APIHandler) getServiceExternal(w http.ResponseWriter, r *http.Request) { + + query, err := parseGetServiceExternalRequest(r) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } + + result, err := druidQuery.GetServiceExternal(aH.sqlClient, query) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } + + aH.writeJSON(w, r, result) + +} + +func (aH *APIHandler) GetServiceExternalAvgDuration(w http.ResponseWriter, r *http.Request) { + + query, err := parseGetServiceExternalRequest(r) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } + + result, err := druidQuery.GetServiceExternalAvgDuration(aH.sqlClient, query) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } + + aH.writeJSON(w, r, result) + +} + +func (aH *APIHandler) getServiceExternalErrors(w http.ResponseWriter, r *http.Request) { + + query, err := parseGetServiceExternalRequest(r) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } + + result, err := druidQuery.GetServiceExternalErrors(aH.sqlClient, query) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } + + aH.writeJSON(w, r, result) + +} + func (aH *APIHandler) getServiceOverview(w http.ResponseWriter, r *http.Request) { query, err := parseGetServiceOverviewRequest(r) diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index 79e519eafa..a6be154c0a 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -76,6 +76,42 @@ func parseGetUsageRequest(r *http.Request) (*model.GetUsageParams, error) { } +func parseGetServiceExternalRequest(r *http.Request) (*model.GetServiceOverviewParams, error) { + startTime, err := parseTime("start", r) + if err != nil { + return nil, err + } + endTime, err := parseTime("end", r) + if err != nil { + return nil, err + } + + stepStr := r.URL.Query().Get("step") + if len(stepStr) == 0 { + return nil, errors.New("step param missing in query") + } + stepInt, err := strconv.Atoi(stepStr) + if err != nil { + return nil, errors.New("step param is not in correct format") + } + + serviceName := r.URL.Query().Get("service") + if len(serviceName) == 0 { + return nil, errors.New("serviceName param missing in query") + } + + getServiceOverviewParams := model.GetServiceOverviewParams{ + StartTime: startTime.Format(time.RFC3339Nano), + EndTime: endTime.Format(time.RFC3339Nano), + ServiceName: serviceName, + Period: fmt.Sprintf("PT%dM", stepInt/60), + StepSeconds: stepInt, + } + + return &getServiceOverviewParams, nil + +} + func parseGetServiceOverviewRequest(r *http.Request) (*model.GetServiceOverviewParams, error) { startTime, err := parseTime("start", r) if err != nil { diff --git a/pkg/query-service/druidQuery/mysql-query.go b/pkg/query-service/druidQuery/mysql-query.go index 4444dbd9a7..e3b6d88e1f 100644 --- a/pkg/query-service/druidQuery/mysql-query.go +++ b/pkg/query-service/druidQuery/mysql-query.go @@ -32,6 +32,15 @@ type ServiceOverviewItem struct { ErrorRate float32 `json:"errorRate"` } +type ServiceExternalItem struct { + Time string `json:"time,omitempty"` + Timestamp int64 `json:"timestamp,omitempty"` + ExternalHttpUrl string `json:"externalHttpUrl,omitempty"` + AvgDuration float32 `json:"avgDuration,omitempty"` + NumCalls int `json:"numCalls,omitempty"` + CallRate float32 `json:"callRate,omitempty"` +} + type UsageItem struct { Time string `json:"time,omitempty"` Timestamp int64 `json:"timestamp"` @@ -207,6 +216,115 @@ func GetUsage(client *SqlClient, query *model.GetUsageParams) (*[]UsageItem, err return &usageResponse, nil } +func GetServiceExternalAvgDuration(client *SqlClient, query *model.GetServiceOverviewParams) (*[]ServiceExternalItem, error) { + + sqlQuery := fmt.Sprintf(`SELECT TIME_FLOOR(__time, '%s') as "time", AVG(DurationNano) as "avgDuration" FROM %s WHERE ServiceName='%s' AND Kind='3' AND ExternalHttpUrl != '' AND "__time" >= '%s' AND "__time" <= '%s' + GROUP BY TIME_FLOOR(__time, '%s'), ExternalHttpUrl`, query.Period, constants.DruidDatasource, query.ServiceName, query.StartTime, query.EndTime, query.Period) + + // zap.S().Debug(sqlQuery) + + response, err := client.Query(sqlQuery, "object") + + if err != nil { + zap.S().Error(query, err) + return nil, fmt.Errorf("Something went wrong in druid query") + } + + // responseStr := string(response) + // zap.S().Info(responseStr) + + res := new([]ServiceExternalItem) + err = json.Unmarshal(response, res) + if err != nil { + zap.S().Error(err) + return nil, fmt.Errorf("Error in unmarshalling response from druid") + } + + for i, _ := range *res { + timeObj, _ := time.Parse(time.RFC3339Nano, (*res)[i].Time) + (*res)[i].Timestamp = int64(timeObj.UnixNano()) + (*res)[i].Time = "" + (*res)[i].CallRate = float32((*res)[i].NumCalls) / float32(query.StepSeconds) + + } + + servicesExternalResponse := (*res)[1:] + return &servicesExternalResponse, nil +} + +func GetServiceExternalErrors(client *SqlClient, query *model.GetServiceOverviewParams) (*[]ServiceExternalItem, error) { + + sqlQuery := fmt.Sprintf(`SELECT TIME_FLOOR(__time, '%s') as "time", COUNT(SpanId) as "numCalls", ExternalHttpUrl as externalHttpUrl FROM %s WHERE ServiceName='%s' AND Kind='3' AND ExternalHttpUrl != '' AND StatusCode >= 500 AND "__time" >= '%s' AND "__time" <= '%s' + GROUP BY TIME_FLOOR(__time, '%s'), ExternalHttpUrl`, query.Period, constants.DruidDatasource, query.ServiceName, query.StartTime, query.EndTime, query.Period) + + // zap.S().Debug(sqlQuery) + + response, err := client.Query(sqlQuery, "object") + + if err != nil { + zap.S().Error(query, err) + return nil, fmt.Errorf("Something went wrong in druid query") + } + + // responseStr := string(response) + // zap.S().Info(responseStr) + + res := new([]ServiceExternalItem) + err = json.Unmarshal(response, res) + if err != nil { + zap.S().Error(err) + return nil, fmt.Errorf("Error in unmarshalling response from druid") + } + + for i, _ := range *res { + timeObj, _ := time.Parse(time.RFC3339Nano, (*res)[i].Time) + (*res)[i].Timestamp = int64(timeObj.UnixNano()) + (*res)[i].Time = "" + (*res)[i].CallRate = float32((*res)[i].NumCalls) / float32(query.StepSeconds) + + } + + servicesExternalResponse := (*res)[1:] + return &servicesExternalResponse, nil +} + +func GetServiceExternal(client *SqlClient, query *model.GetServiceOverviewParams) (*[]ServiceExternalItem, error) { + + sqlQuery := fmt.Sprintf(`SELECT TIME_FLOOR(__time, '%s') as "time", AVG(DurationNano) as "avgDuration", COUNT(SpanId) as "numCalls", ExternalHttpUrl as externalHttpUrl FROM %s WHERE ServiceName='%s' AND Kind='3' AND ExternalHttpUrl != '' + AND "__time" >= '%s' AND "__time" <= '%s' + GROUP BY TIME_FLOOR(__time, '%s'), ExternalHttpUrl`, query.Period, constants.DruidDatasource, query.ServiceName, query.StartTime, query.EndTime, query.Period) + + // zap.S().Debug(sqlQuery) + + response, err := client.Query(sqlQuery, "object") + + if err != nil { + zap.S().Error(query, err) + return nil, fmt.Errorf("Something went wrong in druid query") + } + + // responseStr := string(response) + // zap.S().Info(responseStr) + + res := new([]ServiceExternalItem) + err = json.Unmarshal(response, res) + if err != nil { + zap.S().Error(err) + return nil, fmt.Errorf("Error in unmarshalling response from druid") + } + + for i, _ := range *res { + timeObj, _ := time.Parse(time.RFC3339Nano, (*res)[i].Time) + (*res)[i].Timestamp = int64(timeObj.UnixNano()) + (*res)[i].Time = "" + (*res)[i].CallRate = float32((*res)[i].NumCalls) / float32(query.StepSeconds) + + } + + servicesExternalResponse := (*res)[1:] + return &servicesExternalResponse, nil +} + func GetServiceOverview(client *SqlClient, query *model.GetServiceOverviewParams) (*[]ServiceOverviewItem, error) { sqlQuery := fmt.Sprintf(`SELECT TIME_FLOOR(__time, '%s') as "time", APPROX_QUANTILE_DS("QuantileDuration", 0.5) as p50, APPROX_QUANTILE_DS("QuantileDuration", 0.9) as p90,