External API Calls commit for query-service and flatten-processor

This commit is contained in:
Ankit Nayan 2021-04-26 21:55:37 +05:30
commit 99c073d6da
5 changed files with 263 additions and 20 deletions

View File

@ -42,19 +42,26 @@ var (
) )
type Span struct { type Span struct {
TraceId string TraceId string
SpanId string SpanId string
ParentSpanId string ParentSpanId string
Name string Name string
DurationNano uint64 DurationNano uint64
StartTimeUnixNano uint64 StartTimeUnixNano uint64
ServiceName string ServiceName string
Kind int32 Kind int32
StatusCode int64 References []OtelSpanRef
References []OtelSpanRef Tags []string
Tags []string TagsKeys []string
TagsKeys []string TagsValues []string
TagsValues []string StatusCode int64
ExternalHttpMethod string
ExternalHttpUrl string
Component string
DBSystem string
DBName string
DBOperation string
PeerService string
} }
type OtelSpanRef struct { type OtelSpanRef struct {
@ -195,6 +202,39 @@ func byteSlice2string(byteSlice []byte) string {
return hex.EncodeToString(byteSlice) return hex.EncodeToString(byteSlice)
} }
func populateOtherDimensions(attributes pdata.AttributeMap, span *Span) {
attributes.ForEach(func(k string, v pdata.AttributeValue) {
if k == "http.status_code" {
span.StatusCode = v.IntVal()
}
if k == "http.url" {
span.ExternalHttpUrl = v.StringVal()
}
if k == "http.method" {
span.ExternalHttpMethod = v.StringVal()
}
if k == "component" {
span.Component = v.StringVal()
}
if k == "db.system" {
span.DBSystem = v.StringVal()
}
if k == "db.name" {
span.DBName = v.StringVal()
}
if k == "db.operation" {
span.DBOperation = v.StringVal()
}
if k == "peer.service" {
span.PeerService = v.StringVal()
}
})
}
func newStructuredSpan(otelSpan pdata.Span, ServiceName string) *Span { func newStructuredSpan(otelSpan pdata.Span, ServiceName string) *Span {
durationNano := uint64(otelSpan.EndTime() - otelSpan.StartTime()) durationNano := uint64(otelSpan.EndTime() - otelSpan.StartTime())
@ -202,8 +242,6 @@ func newStructuredSpan(otelSpan pdata.Span, ServiceName string) *Span {
spanID_bytes := otelSpan.SpanID().Bytes() spanID_bytes := otelSpan.SpanID().Bytes()
parentSpanID_bytes := otelSpan.ParentSpanID().Bytes() parentSpanID_bytes := otelSpan.ParentSpanID().Bytes()
var statusCode int64
attributes := otelSpan.Attributes() attributes := otelSpan.Attributes()
var tags []string var tags []string
@ -217,9 +255,6 @@ func newStructuredSpan(otelSpan pdata.Span, ServiceName string) *Span {
} else { } else {
tag = fmt.Sprintf("%s:%s", k, v.StringVal()) tag = fmt.Sprintf("%s:%s", k, v.StringVal())
} }
if k == "http.status_code" {
statusCode = v.IntVal()
}
tags = append(tags, tag) tags = append(tags, tag)
tagsKeys = append(tagsKeys, k) tagsKeys = append(tagsKeys, k)
@ -228,7 +263,7 @@ func newStructuredSpan(otelSpan pdata.Span, ServiceName string) *Span {
references, _ := makeJaegerProtoReferences(otelSpan.Links(), otelSpan.ParentSpanID(), otelSpan.TraceID()) references, _ := makeJaegerProtoReferences(otelSpan.Links(), otelSpan.ParentSpanID(), otelSpan.TraceID())
return &Span{ var span *Span = &Span{
TraceId: hex.EncodeToString(traceID_bytes[:]), TraceId: hex.EncodeToString(traceID_bytes[:]),
SpanId: hex.EncodeToString(spanID_bytes[:]), SpanId: hex.EncodeToString(spanID_bytes[:]),
ParentSpanId: hex.EncodeToString(parentSpanID_bytes[:]), ParentSpanId: hex.EncodeToString(parentSpanID_bytes[:]),
@ -237,12 +272,15 @@ func newStructuredSpan(otelSpan pdata.Span, ServiceName string) *Span {
DurationNano: durationNano, DurationNano: durationNano,
ServiceName: ServiceName, ServiceName: ServiceName,
Kind: int32(otelSpan.Kind()), Kind: int32(otelSpan.Kind()),
StatusCode: statusCode,
References: references, References: references,
Tags: tags, Tags: tags,
TagsKeys: tagsKeys, TagsKeys: tagsKeys,
TagsValues: tagsValues, TagsValues: tagsValues,
} }
populateOtherDimensions(attributes, span)
return span
} }
// ServiceNameForResource gets the service name for a specified Resource. // ServiceNameForResource gets the service name for a specified Resource.

BIN
pkg/query-service/__debug_bin Executable file

Binary file not shown.

View File

@ -63,6 +63,9 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) {
router.HandleFunc("/api/v1/services", aH.getServices).Methods(http.MethodGet) router.HandleFunc("/api/v1/services", aH.getServices).Methods(http.MethodGet)
router.HandleFunc("/api/v1/services/list", aH.getServicesList).Methods(http.MethodGet) router.HandleFunc("/api/v1/services/list", aH.getServicesList).Methods(http.MethodGet)
router.HandleFunc("/api/v1/service/overview", aH.getServiceOverview).Methods(http.MethodGet) router.HandleFunc("/api/v1/service/overview", aH.getServiceOverview).Methods(http.MethodGet)
router.HandleFunc("/api/v1/service/externalAvgDuration", aH.GetServiceExternalAvgDuration).Methods(http.MethodGet)
router.HandleFunc("/api/v1/service/externalErrors", aH.getServiceExternalErrors).Methods(http.MethodGet)
router.HandleFunc("/api/v1/service/external", aH.getServiceExternal).Methods(http.MethodGet)
router.HandleFunc("/api/v1/service/{service}/operations", aH.getOperations).Methods(http.MethodGet) router.HandleFunc("/api/v1/service/{service}/operations", aH.getOperations).Methods(http.MethodGet)
router.HandleFunc("/api/v1/service/top_endpoints", aH.getTopEndpoints).Methods(http.MethodGet) router.HandleFunc("/api/v1/service/top_endpoints", aH.getTopEndpoints).Methods(http.MethodGet)
router.HandleFunc("/api/v1/spans", aH.searchSpans).Methods(http.MethodGet) router.HandleFunc("/api/v1/spans", aH.searchSpans).Methods(http.MethodGet)
@ -175,6 +178,54 @@ func (aH *APIHandler) getUsage(w http.ResponseWriter, r *http.Request) {
} }
func (aH *APIHandler) getServiceExternal(w http.ResponseWriter, r *http.Request) {
query, err := parseGetServiceExternalRequest(r)
if aH.handleError(w, err, http.StatusBadRequest) {
return
}
result, err := druidQuery.GetServiceExternal(aH.sqlClient, query)
if aH.handleError(w, err, http.StatusBadRequest) {
return
}
aH.writeJSON(w, r, result)
}
func (aH *APIHandler) GetServiceExternalAvgDuration(w http.ResponseWriter, r *http.Request) {
query, err := parseGetServiceExternalRequest(r)
if aH.handleError(w, err, http.StatusBadRequest) {
return
}
result, err := druidQuery.GetServiceExternalAvgDuration(aH.sqlClient, query)
if aH.handleError(w, err, http.StatusBadRequest) {
return
}
aH.writeJSON(w, r, result)
}
func (aH *APIHandler) getServiceExternalErrors(w http.ResponseWriter, r *http.Request) {
query, err := parseGetServiceExternalRequest(r)
if aH.handleError(w, err, http.StatusBadRequest) {
return
}
result, err := druidQuery.GetServiceExternalErrors(aH.sqlClient, query)
if aH.handleError(w, err, http.StatusBadRequest) {
return
}
aH.writeJSON(w, r, result)
}
func (aH *APIHandler) getServiceOverview(w http.ResponseWriter, r *http.Request) { func (aH *APIHandler) getServiceOverview(w http.ResponseWriter, r *http.Request) {
query, err := parseGetServiceOverviewRequest(r) query, err := parseGetServiceOverviewRequest(r)

View File

@ -76,6 +76,42 @@ func parseGetUsageRequest(r *http.Request) (*model.GetUsageParams, error) {
} }
func parseGetServiceExternalRequest(r *http.Request) (*model.GetServiceOverviewParams, error) {
startTime, err := parseTime("start", r)
if err != nil {
return nil, err
}
endTime, err := parseTime("end", r)
if err != nil {
return nil, err
}
stepStr := r.URL.Query().Get("step")
if len(stepStr) == 0 {
return nil, errors.New("step param missing in query")
}
stepInt, err := strconv.Atoi(stepStr)
if err != nil {
return nil, errors.New("step param is not in correct format")
}
serviceName := r.URL.Query().Get("service")
if len(serviceName) == 0 {
return nil, errors.New("serviceName param missing in query")
}
getServiceOverviewParams := model.GetServiceOverviewParams{
StartTime: startTime.Format(time.RFC3339Nano),
EndTime: endTime.Format(time.RFC3339Nano),
ServiceName: serviceName,
Period: fmt.Sprintf("PT%dM", stepInt/60),
StepSeconds: stepInt,
}
return &getServiceOverviewParams, nil
}
func parseGetServiceOverviewRequest(r *http.Request) (*model.GetServiceOverviewParams, error) { func parseGetServiceOverviewRequest(r *http.Request) (*model.GetServiceOverviewParams, error) {
startTime, err := parseTime("start", r) startTime, err := parseTime("start", r)
if err != nil { if err != nil {

View File

@ -32,6 +32,15 @@ type ServiceOverviewItem struct {
ErrorRate float32 `json:"errorRate"` ErrorRate float32 `json:"errorRate"`
} }
type ServiceExternalItem struct {
Time string `json:"time,omitempty"`
Timestamp int64 `json:"timestamp,omitempty"`
ExternalHttpUrl string `json:"externalHttpUrl,omitempty"`
AvgDuration float32 `json:"avgDuration,omitempty"`
NumCalls int `json:"numCalls,omitempty"`
CallRate float32 `json:"callRate,omitempty"`
}
type UsageItem struct { type UsageItem struct {
Time string `json:"time,omitempty"` Time string `json:"time,omitempty"`
Timestamp int64 `json:"timestamp"` Timestamp int64 `json:"timestamp"`
@ -207,6 +216,115 @@ func GetUsage(client *SqlClient, query *model.GetUsageParams) (*[]UsageItem, err
return &usageResponse, nil return &usageResponse, nil
} }
func GetServiceExternalAvgDuration(client *SqlClient, query *model.GetServiceOverviewParams) (*[]ServiceExternalItem, error) {
sqlQuery := fmt.Sprintf(`SELECT TIME_FLOOR(__time, '%s') as "time", AVG(DurationNano) as "avgDuration" FROM %s WHERE ServiceName='%s' AND Kind='3' AND ExternalHttpUrl != '' AND "__time" >= '%s' AND "__time" <= '%s'
GROUP BY TIME_FLOOR(__time, '%s'), ExternalHttpUrl`, query.Period, constants.DruidDatasource, query.ServiceName, query.StartTime, query.EndTime, query.Period)
// zap.S().Debug(sqlQuery)
response, err := client.Query(sqlQuery, "object")
if err != nil {
zap.S().Error(query, err)
return nil, fmt.Errorf("Something went wrong in druid query")
}
// responseStr := string(response)
// zap.S().Info(responseStr)
res := new([]ServiceExternalItem)
err = json.Unmarshal(response, res)
if err != nil {
zap.S().Error(err)
return nil, fmt.Errorf("Error in unmarshalling response from druid")
}
for i, _ := range *res {
timeObj, _ := time.Parse(time.RFC3339Nano, (*res)[i].Time)
(*res)[i].Timestamp = int64(timeObj.UnixNano())
(*res)[i].Time = ""
(*res)[i].CallRate = float32((*res)[i].NumCalls) / float32(query.StepSeconds)
}
servicesExternalResponse := (*res)[1:]
return &servicesExternalResponse, nil
}
func GetServiceExternalErrors(client *SqlClient, query *model.GetServiceOverviewParams) (*[]ServiceExternalItem, error) {
sqlQuery := fmt.Sprintf(`SELECT TIME_FLOOR(__time, '%s') as "time", COUNT(SpanId) as "numCalls", ExternalHttpUrl as externalHttpUrl FROM %s WHERE ServiceName='%s' AND Kind='3' AND ExternalHttpUrl != '' AND StatusCode >= 500 AND "__time" >= '%s' AND "__time" <= '%s'
GROUP BY TIME_FLOOR(__time, '%s'), ExternalHttpUrl`, query.Period, constants.DruidDatasource, query.ServiceName, query.StartTime, query.EndTime, query.Period)
// zap.S().Debug(sqlQuery)
response, err := client.Query(sqlQuery, "object")
if err != nil {
zap.S().Error(query, err)
return nil, fmt.Errorf("Something went wrong in druid query")
}
// responseStr := string(response)
// zap.S().Info(responseStr)
res := new([]ServiceExternalItem)
err = json.Unmarshal(response, res)
if err != nil {
zap.S().Error(err)
return nil, fmt.Errorf("Error in unmarshalling response from druid")
}
for i, _ := range *res {
timeObj, _ := time.Parse(time.RFC3339Nano, (*res)[i].Time)
(*res)[i].Timestamp = int64(timeObj.UnixNano())
(*res)[i].Time = ""
(*res)[i].CallRate = float32((*res)[i].NumCalls) / float32(query.StepSeconds)
}
servicesExternalResponse := (*res)[1:]
return &servicesExternalResponse, nil
}
func GetServiceExternal(client *SqlClient, query *model.GetServiceOverviewParams) (*[]ServiceExternalItem, error) {
sqlQuery := fmt.Sprintf(`SELECT TIME_FLOOR(__time, '%s') as "time", AVG(DurationNano) as "avgDuration", COUNT(SpanId) as "numCalls", ExternalHttpUrl as externalHttpUrl FROM %s WHERE ServiceName='%s' AND Kind='3' AND ExternalHttpUrl != ''
AND "__time" >= '%s' AND "__time" <= '%s'
GROUP BY TIME_FLOOR(__time, '%s'), ExternalHttpUrl`, query.Period, constants.DruidDatasource, query.ServiceName, query.StartTime, query.EndTime, query.Period)
// zap.S().Debug(sqlQuery)
response, err := client.Query(sqlQuery, "object")
if err != nil {
zap.S().Error(query, err)
return nil, fmt.Errorf("Something went wrong in druid query")
}
// responseStr := string(response)
// zap.S().Info(responseStr)
res := new([]ServiceExternalItem)
err = json.Unmarshal(response, res)
if err != nil {
zap.S().Error(err)
return nil, fmt.Errorf("Error in unmarshalling response from druid")
}
for i, _ := range *res {
timeObj, _ := time.Parse(time.RFC3339Nano, (*res)[i].Time)
(*res)[i].Timestamp = int64(timeObj.UnixNano())
(*res)[i].Time = ""
(*res)[i].CallRate = float32((*res)[i].NumCalls) / float32(query.StepSeconds)
}
servicesExternalResponse := (*res)[1:]
return &servicesExternalResponse, nil
}
func GetServiceOverview(client *SqlClient, query *model.GetServiceOverviewParams) (*[]ServiceOverviewItem, error) { func GetServiceOverview(client *SqlClient, query *model.GetServiceOverviewParams) (*[]ServiceOverviewItem, error) {
sqlQuery := fmt.Sprintf(`SELECT TIME_FLOOR(__time, '%s') as "time", APPROX_QUANTILE_DS("QuantileDuration", 0.5) as p50, APPROX_QUANTILE_DS("QuantileDuration", 0.9) as p90, sqlQuery := fmt.Sprintf(`SELECT TIME_FLOOR(__time, '%s') as "time", APPROX_QUANTILE_DS("QuantileDuration", 0.5) as p50, APPROX_QUANTILE_DS("QuantileDuration", 0.9) as p90,