mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-12 19:59:04 +08:00
More methods from interface implemented for ClickHouse
This commit is contained in:
parent
6aed23ce66
commit
51fe634566
Binary file not shown.
@ -178,10 +178,6 @@ func (r *ClickHouseReader) SearchSpans(ctx context.Context, queryParams *model.S
|
||||
|
||||
}
|
||||
|
||||
// // zap.S().Debug("MinDuration: ", queryParams.MinDuration)
|
||||
// var lower string
|
||||
// var upper string
|
||||
|
||||
if len(queryParams.MinDuration) != 0 {
|
||||
query = query + " AND durationNano >= ?"
|
||||
args = append(args, queryParams.MinDuration)
|
||||
@ -209,8 +205,9 @@ func (r *ClickHouseReader) SearchSpans(ctx context.Context, queryParams *model.S
|
||||
}
|
||||
|
||||
if item.Key == "error" && item.Value == "true" {
|
||||
|
||||
query = query + " OR statusCode>=500"
|
||||
}
|
||||
query = query + " ORDER BY timestamp LIMIT 100"
|
||||
|
||||
}
|
||||
|
||||
@ -301,11 +298,11 @@ func (r *ClickHouseReader) GetServiceExternalAvgDuration(ctx context.Context, qu
|
||||
|
||||
func (r *ClickHouseReader) GetServiceExternalErrors(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) {
|
||||
|
||||
var serviceExternalItems []model.ServiceExternalItem
|
||||
var serviceExternalErrorItems []model.ServiceExternalItem
|
||||
|
||||
query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, avg(durationNano) as avgDuration, count(1) as numCalls, externalHttpUrl FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' AND kind='3' AND externalHttpUrl IS NOT NULL AND StatusCode >= 500 GROUP BY time, externalHttpUrl ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
|
||||
query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, avg(durationNano) as avgDuration, count(1) as numCalls, externalHttpUrl FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' AND kind='3' AND externalHttpUrl IS NOT NULL AND statusCode >= 500 GROUP BY time, externalHttpUrl ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
|
||||
|
||||
err := r.db.Select(&serviceExternalItems, query)
|
||||
err := r.db.Select(&serviceExternalErrorItems, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
@ -313,19 +310,44 @@ func (r *ClickHouseReader) GetServiceExternalErrors(ctx context.Context, queryPa
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
var serviceExternalTotalItems []model.ServiceExternalItem
|
||||
|
||||
for i, _ := range serviceExternalItems {
|
||||
timeObj, _ := time.Parse(time.RFC3339Nano, serviceExternalItems[i].Time)
|
||||
serviceExternalItems[i].Timestamp = int64(timeObj.UnixNano())
|
||||
serviceExternalItems[i].Time = ""
|
||||
serviceExternalItems[i].CallRate = float32(serviceExternalItems[i].NumCalls) / float32(queryParams.StepSeconds)
|
||||
queryTotal := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, avg(durationNano) as avgDuration, count(1) as numCalls, externalHttpUrl FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' AND kind='3' AND externalHttpUrl IS NOT NULL GROUP BY time, externalHttpUrl ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
|
||||
|
||||
errTotal := r.db.Select(&serviceExternalTotalItems, queryTotal)
|
||||
|
||||
if errTotal != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
if serviceExternalItems == nil {
|
||||
serviceExternalItems = []model.ServiceExternalItem{}
|
||||
m := make(map[string]int)
|
||||
|
||||
for j, _ := range serviceExternalErrorItems {
|
||||
timeObj, _ := time.Parse(time.RFC3339Nano, serviceExternalErrorItems[j].Time)
|
||||
m[strconv.FormatInt(timeObj.UnixNano(), 10)+"-"+serviceExternalErrorItems[j].ExternalHttpUrl] = serviceExternalErrorItems[j].NumCalls
|
||||
}
|
||||
|
||||
return &serviceExternalItems, nil
|
||||
for i, _ := range serviceExternalTotalItems {
|
||||
timeObj, _ := time.Parse(time.RFC3339Nano, serviceExternalTotalItems[i].Time)
|
||||
serviceExternalTotalItems[i].Timestamp = int64(timeObj.UnixNano())
|
||||
serviceExternalTotalItems[i].Time = ""
|
||||
// serviceExternalTotalItems[i].CallRate = float32(serviceExternalTotalItems[i].NumCalls) / float32(queryParams.StepSeconds)
|
||||
|
||||
if val, ok := m[strconv.FormatInt(serviceExternalTotalItems[i].Timestamp, 10)+"-"+serviceExternalTotalItems[i].ExternalHttpUrl]; ok {
|
||||
serviceExternalTotalItems[i].NumErrors = val
|
||||
serviceExternalTotalItems[i].ErrorRate = float32(serviceExternalTotalItems[i].NumErrors) * 100 / float32(serviceExternalTotalItems[i].NumCalls)
|
||||
}
|
||||
serviceExternalTotalItems[i].CallRate = 0
|
||||
serviceExternalTotalItems[i].NumCalls = 0
|
||||
|
||||
}
|
||||
|
||||
if serviceExternalTotalItems == nil {
|
||||
serviceExternalTotalItems = []model.ServiceExternalItem{}
|
||||
}
|
||||
|
||||
return &serviceExternalTotalItems, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetServiceExternal(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) {
|
||||
@ -356,3 +378,185 @@ func (r *ClickHouseReader) GetServiceExternal(ctx context.Context, queryParams *
|
||||
|
||||
return &serviceExternalItems, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetTopEndpoints(ctx context.Context, queryParams *model.GetTopEndpointsParams) (*[]model.TopEndpointsItem, error) {
|
||||
|
||||
var topEndpointsItems []model.TopEndpointsItem
|
||||
|
||||
query := fmt.Sprintf("SELECT quantile(0.5)(durationNano) as p50, quantile(0.95)(durationNano) as p95, quantile(0.99)(durationNano) as p99, COUNT(1) as numCalls, name FROM %s WHERE timestamp >= '%s' AND timestamp <= '%s' AND kind='2' and serviceName='%s' GROUP BY name", r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10), queryParams.ServiceName)
|
||||
|
||||
err := r.db.Select(&topEndpointsItems, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
if topEndpointsItems == nil {
|
||||
topEndpointsItems = []model.TopEndpointsItem{}
|
||||
}
|
||||
|
||||
return &topEndpointsItems, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetUsage(ctx context.Context, queryParams *model.GetUsageParams) (*[]model.UsageItem, error) {
|
||||
|
||||
var usageItems []model.UsageItem
|
||||
|
||||
var query string
|
||||
if len(queryParams.ServiceName) != 0 {
|
||||
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d HOUR) as time, count(1) as count FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' GROUP BY time ORDER BY time ASC", queryParams.StepHour, r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
|
||||
} else {
|
||||
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d HOUR) as time, count(1) as count FROM %s WHERE timestamp>='%s' AND timestamp<='%s' GROUP BY time ORDER BY time ASC", queryParams.StepHour, r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
|
||||
}
|
||||
|
||||
err := r.db.Select(&usageItems, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
for i, _ := range usageItems {
|
||||
timeObj, _ := time.Parse(time.RFC3339Nano, usageItems[i].Time)
|
||||
usageItems[i].Timestamp = int64(timeObj.UnixNano())
|
||||
usageItems[i].Time = ""
|
||||
}
|
||||
|
||||
if usageItems == nil {
|
||||
usageItems = []model.UsageItem{}
|
||||
}
|
||||
|
||||
return &usageItems, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetServicesList(ctx context.Context) (*[]string, error) {
|
||||
|
||||
services := []string{}
|
||||
|
||||
query := fmt.Sprintf(`SELECT DISTINCT serviceName FROM %s WHERE toDate(timestamp) > now() - INTERVAL 1 DAY`, r.indexTable)
|
||||
|
||||
err := r.db.Select(&services, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
return &services, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetTags(ctx context.Context, serviceName string) (*[]model.TagItem, error) {
|
||||
|
||||
tagItems := []model.TagItem{}
|
||||
|
||||
query := fmt.Sprintf(`SELECT DISTINCT arrayJoin(tagsKeys) as tagKeys FROM %s WHERE serviceName='%s' AND toDate(timestamp) > now() - INTERVAL 1 DAY`, r.indexTable, serviceName)
|
||||
|
||||
err := r.db.Select(&tagItems, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
return &tagItems, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetOperations(ctx context.Context, serviceName string) (*[]string, error) {
|
||||
|
||||
operations := []string{}
|
||||
|
||||
query := fmt.Sprintf(`SELECT DISTINCT(name) FROM %s WHERE serviceName='%s' AND toDate(timestamp) > now() - INTERVAL 1 DAY`, r.indexTable, serviceName)
|
||||
|
||||
err := r.db.Select(&operations, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
return &operations, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) SearchTraces(ctx context.Context, traceId string) (*[]model.SearchSpansResult, error) {
|
||||
|
||||
var searchScanReponses []model.SearchSpanReponseItem
|
||||
|
||||
query := fmt.Sprintf("SELECT timestamp, spanID, traceID, serviceName, name, kind, durationNano, tagsKeys, tagsValues, references FROM %s WHERE traceID='%s'", r.indexTable, traceId)
|
||||
|
||||
err := r.db.Select(&searchScanReponses, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
searchSpansResult := []model.SearchSpansResult{
|
||||
model.SearchSpansResult{
|
||||
Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References"},
|
||||
Events: make([][]interface{}, len(searchScanReponses)),
|
||||
},
|
||||
}
|
||||
|
||||
for i, item := range searchScanReponses {
|
||||
spanEvents := item.GetValues()
|
||||
searchSpansResult[0].Events[i] = spanEvents
|
||||
}
|
||||
|
||||
return &searchSpansResult, nil
|
||||
|
||||
}
|
||||
func (r *ClickHouseReader) GetServiceMapDependencies(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) {
|
||||
serviceMapDependencyItems := []model.ServiceMapDependencyItem{}
|
||||
|
||||
query := fmt.Sprintf(`SELECT spanID, parentSpanID, serviceName FROM %s WHERE timestamp>='%s' AND timestamp<='%s'`, r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
|
||||
|
||||
err := r.db.Select(&serviceMapDependencyItems, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
serviceMap := make(map[string]*model.ServiceMapDependencyResponseItem)
|
||||
|
||||
spanId2ServiceNameMap := make(map[string]string)
|
||||
for i, _ := range serviceMapDependencyItems {
|
||||
spanId2ServiceNameMap[serviceMapDependencyItems[i].SpanId] = serviceMapDependencyItems[i].ServiceName
|
||||
}
|
||||
for i, _ := range serviceMapDependencyItems {
|
||||
parent2childServiceName := spanId2ServiceNameMap[serviceMapDependencyItems[i].ParentSpanId] + "-" + spanId2ServiceNameMap[serviceMapDependencyItems[i].SpanId]
|
||||
if _, ok := serviceMap[parent2childServiceName]; !ok {
|
||||
serviceMap[parent2childServiceName] = &model.ServiceMapDependencyResponseItem{
|
||||
Parent: spanId2ServiceNameMap[serviceMapDependencyItems[i].ParentSpanId],
|
||||
Child: spanId2ServiceNameMap[serviceMapDependencyItems[i].SpanId],
|
||||
CallCount: 1,
|
||||
}
|
||||
} else {
|
||||
serviceMap[parent2childServiceName].CallCount++
|
||||
}
|
||||
}
|
||||
|
||||
retMe := make([]model.ServiceMapDependencyResponseItem, 0, len(serviceMap))
|
||||
for _, dependency := range serviceMap {
|
||||
if dependency.Parent == "" {
|
||||
continue
|
||||
}
|
||||
retMe = append(retMe, *dependency)
|
||||
}
|
||||
|
||||
return &retMe, nil
|
||||
}
|
||||
|
@ -66,3 +66,31 @@ func (druid *DruidReader) GetServiceExternalErrors(ctx context.Context, query *m
|
||||
func (druid *DruidReader) GetServiceExternal(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) {
|
||||
return druidQuery.GetServiceExternal(druid.SqlClient, query)
|
||||
}
|
||||
|
||||
func (druid *DruidReader) GetTopEndpoints(ctx context.Context, query *model.GetTopEndpointsParams) (*[]model.TopEndpointsItem, error) {
|
||||
return druidQuery.GetTopEndpoints(druid.SqlClient, query)
|
||||
}
|
||||
|
||||
func (druid *DruidReader) GetUsage(ctx context.Context, query *model.GetUsageParams) (*[]model.UsageItem, error) {
|
||||
return druidQuery.GetUsage(druid.SqlClient, query)
|
||||
}
|
||||
|
||||
func (druid *DruidReader) GetOperations(ctx context.Context, serviceName string) (*[]string, error) {
|
||||
return druidQuery.GetOperations(druid.SqlClient, serviceName)
|
||||
}
|
||||
|
||||
func (druid *DruidReader) GetTags(ctx context.Context, serviceName string) (*[]model.TagItem, error) {
|
||||
return druidQuery.GetTags(druid.SqlClient, serviceName)
|
||||
}
|
||||
|
||||
func (druid *DruidReader) GetServicesList(ctx context.Context) (*[]string, error) {
|
||||
return druidQuery.GetServicesList(druid.SqlClient)
|
||||
}
|
||||
|
||||
func (druid *DruidReader) SearchTraces(ctx context.Context, traceId string) (*[]model.SearchSpansResult, error) {
|
||||
return druidQuery.SearchTraces(druid.Client, traceId)
|
||||
}
|
||||
|
||||
func (druid *DruidReader) GetServiceMapDependencies(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) {
|
||||
return druidQuery.GetServiceMapDependencies(druid.SqlClient, query)
|
||||
}
|
||||
|
@ -58,20 +58,20 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) {
|
||||
router.HandleFunc("/api/v1/user", aH.user).Methods(http.MethodPost)
|
||||
// router.HandleFunc("/api/v1/get_percentiles", aH.getApplicationPercentiles).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/services", aH.getServices).Methods(http.MethodGet)
|
||||
// router.HandleFunc("/api/v1/services/list", aH.getServicesList).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/services/list", aH.getServicesList).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/service/overview", aH.getServiceOverview).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/service/dbOverview", aH.getServiceDBOverview).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/service/externalAvgDuration", aH.GetServiceExternalAvgDuration).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/service/externalErrors", aH.getServiceExternalErrors).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/service/external", aH.getServiceExternal).Methods(http.MethodGet)
|
||||
// router.HandleFunc("/api/v1/service/{service}/operations", aH.getOperations).Methods(http.MethodGet)
|
||||
// router.HandleFunc("/api/v1/service/top_endpoints", aH.getTopEndpoints).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/service/{service}/operations", aH.getOperations).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/service/top_endpoints", aH.getTopEndpoints).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/spans", aH.searchSpans).Methods(http.MethodGet)
|
||||
// router.HandleFunc("/api/v1/spans/aggregates", aH.searchSpansAggregates).Methods(http.MethodGet)
|
||||
// router.HandleFunc("/api/v1/tags", aH.searchTags).Methods(http.MethodGet)
|
||||
// router.HandleFunc("/api/v1/traces/{traceId}", aH.searchTraces).Methods(http.MethodGet)
|
||||
// router.HandleFunc("/api/v1/usage", aH.getUsage).Methods(http.MethodGet)
|
||||
// router.HandleFunc("/api/v1/serviceMapDependencies", aH.serviceMapDependencies).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/tags", aH.searchTags).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/traces/{traceId}", aH.searchTraces).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/usage", aH.getUsage).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/serviceMapDependencies", aH.serviceMapDependencies).Methods(http.MethodGet)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) user(w http.ResponseWriter, r *http.Request) {
|
||||
@ -99,83 +99,84 @@ func (aH *APIHandler) user(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
}
|
||||
|
||||
// func (aH *APIHandler) getOperations(w http.ResponseWriter, r *http.Request) {
|
||||
func (aH *APIHandler) getOperations(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// vars := mux.Vars(r)
|
||||
// serviceName := vars["service"]
|
||||
vars := mux.Vars(r)
|
||||
serviceName := vars["service"]
|
||||
|
||||
// var err error
|
||||
// if len(serviceName) == 0 {
|
||||
// err = fmt.Errorf("service param not found")
|
||||
// }
|
||||
// if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
// return
|
||||
// }
|
||||
var err error
|
||||
if len(serviceName) == 0 {
|
||||
err = fmt.Errorf("service param not found")
|
||||
}
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
|
||||
// result, err := druidQuery.GetOperations(aH.sqlClient, serviceName)
|
||||
// if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
// return
|
||||
// }
|
||||
result, err := (*aH.reader).GetOperations(context.Background(), serviceName)
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
|
||||
// aH.writeJSON(w, r, result)
|
||||
aH.writeJSON(w, r, result)
|
||||
|
||||
// }
|
||||
}
|
||||
|
||||
// func (aH *APIHandler) getServicesList(w http.ResponseWriter, r *http.Request) {
|
||||
func (aH *APIHandler) getServicesList(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// result, err := druidQuery.GetServicesList(aH.sqlClient)
|
||||
// if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
// return
|
||||
// }
|
||||
result, err := (*aH.reader).GetServicesList(context.Background())
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
|
||||
// aH.writeJSON(w, r, result)
|
||||
aH.writeJSON(w, r, result)
|
||||
|
||||
// }
|
||||
}
|
||||
|
||||
// func (aH *APIHandler) searchTags(w http.ResponseWriter, r *http.Request) {
|
||||
func (aH *APIHandler) searchTags(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// serviceName := r.URL.Query().Get("service")
|
||||
serviceName := r.URL.Query().Get("service")
|
||||
|
||||
// result, err := druidQuery.GetTags(aH.sqlClient, serviceName)
|
||||
// if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
// return
|
||||
// }
|
||||
result, err := (*aH.reader).GetTags(context.Background(), serviceName)
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
|
||||
// aH.writeJSON(w, r, result)
|
||||
aH.writeJSON(w, r, result)
|
||||
|
||||
// }
|
||||
}
|
||||
|
||||
// func (aH *APIHandler) getTopEndpoints(w http.ResponseWriter, r *http.Request) {
|
||||
func (aH *APIHandler) getTopEndpoints(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// query, err := parseGetTopEndpointsRequest(r)
|
||||
// if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
// return
|
||||
// }
|
||||
query, err := parseGetTopEndpointsRequest(r)
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
|
||||
// result, err := druidQuery.GetTopEndpoints(aH.sqlClient, query)
|
||||
// if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
// return
|
||||
// }
|
||||
result, err := (*aH.reader).GetTopEndpoints(context.Background(), query)
|
||||
|
||||
// aH.writeJSON(w, r, result)
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
|
||||
// }
|
||||
aH.writeJSON(w, r, result)
|
||||
|
||||
// func (aH *APIHandler) getUsage(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// query, err := parseGetUsageRequest(r)
|
||||
// if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
// return
|
||||
// }
|
||||
func (aH *APIHandler) getUsage(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// result, err := druidQuery.GetUsage(aH.sqlClient, query)
|
||||
// if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
// return
|
||||
// }
|
||||
query, err := parseGetUsageRequest(r)
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
|
||||
// aH.writeJSON(w, r, result)
|
||||
result, err := (*aH.reader).GetUsage(context.Background(), query)
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
|
||||
// }
|
||||
aH.writeJSON(w, r, result)
|
||||
|
||||
}
|
||||
|
||||
func (aH *APIHandler) getServiceDBOverview(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
@ -280,34 +281,35 @@ func (aH *APIHandler) getServices(w http.ResponseWriter, r *http.Request) {
|
||||
aH.writeJSON(w, r, result)
|
||||
}
|
||||
|
||||
// func (aH *APIHandler) serviceMapDependencies(w http.ResponseWriter, r *http.Request) {
|
||||
func (aH *APIHandler) serviceMapDependencies(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// query, err := parseGetServicesRequest(r)
|
||||
// if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
// return
|
||||
// }
|
||||
query, err := parseGetServicesRequest(r)
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
|
||||
// result, err := druidQuery.GetServiceMapDependencies(aH.sqlClient, query)
|
||||
// if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
// return
|
||||
// }
|
||||
result, err := (*aH.reader).GetServiceMapDependencies(context.Background(), query)
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
|
||||
// aH.writeJSON(w, r, result)
|
||||
// }
|
||||
aH.writeJSON(w, r, result)
|
||||
}
|
||||
|
||||
// func (aH *APIHandler) searchTraces(w http.ResponseWriter, r *http.Request) {
|
||||
func (aH *APIHandler) searchTraces(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// vars := mux.Vars(r)
|
||||
// traceId := vars["traceId"]
|
||||
vars := mux.Vars(r)
|
||||
traceId := vars["traceId"]
|
||||
|
||||
// result, err := druidQuery.SearchTraces(aH.client, traceId)
|
||||
// if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
// return
|
||||
// }
|
||||
result, err := (*aH.reader).SearchTraces(context.Background(), traceId)
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
|
||||
// aH.writeJSON(w, r, result)
|
||||
aH.writeJSON(w, r, result)
|
||||
|
||||
}
|
||||
|
||||
// }
|
||||
// func (aH *APIHandler) searchSpansAggregates(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// query, err := parseSearchSpanAggregatesRequest(r)
|
||||
|
@ -15,4 +15,11 @@ type Reader interface {
|
||||
GetServiceExternalAvgDuration(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error)
|
||||
GetServiceExternalErrors(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error)
|
||||
GetServiceExternal(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error)
|
||||
GetTopEndpoints(ctx context.Context, query *model.GetTopEndpointsParams) (*[]model.TopEndpointsItem, error)
|
||||
GetUsage(ctx context.Context, query *model.GetUsageParams) (*[]model.UsageItem, error)
|
||||
GetOperations(ctx context.Context, serviceName string) (*[]string, error)
|
||||
GetTags(ctx context.Context, serviceName string) (*[]model.TagItem, error)
|
||||
GetServicesList(ctx context.Context) (*[]string, error)
|
||||
SearchTraces(ctx context.Context, traceID string) (*[]model.SearchSpansResult, error)
|
||||
GetServiceMapDependencies(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error)
|
||||
}
|
||||
|
@ -38,6 +38,8 @@ func parseGetTopEndpointsRequest(r *http.Request) (*model.GetTopEndpointsParams,
|
||||
StartTime: startTime.Format(time.RFC3339Nano),
|
||||
EndTime: endTime.Format(time.RFC3339Nano),
|
||||
ServiceName: serviceName,
|
||||
Start: startTime,
|
||||
End: endTime,
|
||||
}
|
||||
|
||||
return &getTopEndpointsParams, nil
|
||||
@ -64,12 +66,16 @@ func parseGetUsageRequest(r *http.Request) (*model.GetUsageParams, error) {
|
||||
}
|
||||
|
||||
serviceName := r.URL.Query().Get("service")
|
||||
stepHour := stepInt / 3600
|
||||
|
||||
getUsageParams := model.GetUsageParams{
|
||||
StartTime: startTime.Format(time.RFC3339Nano),
|
||||
EndTime: endTime.Format(time.RFC3339Nano),
|
||||
Start: startTime,
|
||||
End: endTime,
|
||||
ServiceName: serviceName,
|
||||
Period: fmt.Sprintf("PT%dH", stepInt/3600),
|
||||
Period: fmt.Sprintf("PT%dH", stepHour),
|
||||
StepHour: stepHour,
|
||||
}
|
||||
|
||||
return &getUsageParams, nil
|
||||
|
@ -97,8 +97,10 @@ func createHTTPServer() (*http.Server, error) {
|
||||
|
||||
storage := os.Getenv("STORAGE")
|
||||
if storage == "druid" {
|
||||
zap.S().Info("Using Apache Druid as datastore ...")
|
||||
reader = druidReader.NewReader()
|
||||
} else if storage == "clickhouse" {
|
||||
zap.S().Info("Using ClickHouse as datastore ...")
|
||||
reader = clickhouseReader.NewReader()
|
||||
} else {
|
||||
return nil, fmt.Errorf("Storage type: %s is not supported in query service", storage)
|
||||
|
@ -101,9 +101,9 @@ func GetTags(client *SqlClient, serviceName string) (*[]model.TagItem, error) {
|
||||
return &tagResponse, nil
|
||||
}
|
||||
|
||||
func GetTopEndpoints(client *SqlClient, query *model.GetTopEndpointsParams) (*[]model.TopEnpointsItem, error) {
|
||||
func GetTopEndpoints(client *SqlClient, query *model.GetTopEndpointsParams) (*[]model.TopEndpointsItem, error) {
|
||||
|
||||
sqlQuery := fmt.Sprintf(`SELECT APPROX_QUANTILE_DS("QuantileDuration", 0.5) as p50, APPROX_QUANTILE_DS("QuantileDuration", 0.9) as p90, APPROX_QUANTILE_DS("QuantileDuration", 0.99) as p99, COUNT(SpanId) as numCalls, Name FROM "%s" WHERE "__time" >= '%s' AND "__time" <= '%s' AND "Kind"='2' and "ServiceName"='%s' GROUP BY Name`, constants.DruidDatasource, query.StartTime, query.EndTime, query.ServiceName)
|
||||
sqlQuery := fmt.Sprintf(`SELECT APPROX_QUANTILE_DS("QuantileDuration", 0.5) as p50, APPROX_QUANTILE_DS("QuantileDuration", 0.95) as p95, APPROX_QUANTILE_DS("QuantileDuration", 0.99) as p99, COUNT(SpanId) as numCalls, Name FROM "%s" WHERE "__time" >= '%s' AND "__time" <= '%s' AND "Kind"='2' and "ServiceName"='%s' GROUP BY Name`, constants.DruidDatasource, query.StartTime, query.EndTime, query.ServiceName)
|
||||
|
||||
// zap.S().Debug(sqlQuery)
|
||||
|
||||
@ -116,7 +116,7 @@ func GetTopEndpoints(client *SqlClient, query *model.GetTopEndpointsParams) (*[]
|
||||
|
||||
// zap.S().Info(string(response))
|
||||
|
||||
res := new([]model.TopEnpointsItem)
|
||||
res := new([]model.TopEndpointsItem)
|
||||
err = json.Unmarshal(response, res)
|
||||
if err != nil {
|
||||
zap.S().Error(err)
|
||||
@ -517,7 +517,7 @@ func GetServices(client *SqlClient, query *model.GetServicesParams) (*[]model.Se
|
||||
|
||||
func GetServiceMapDependencies(client *SqlClient, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) {
|
||||
|
||||
sqlQuery := fmt.Sprintf(`SELECT SpanId, ParentSpanId, ServiceName FROM %s WHERE "__time" >= '%s' AND "__time" <= '%s' ORDER BY __time DESC LIMIT 100000`, constants.DruidDatasource, query.StartTime, query.EndTime)
|
||||
sqlQuery := fmt.Sprintf(`SELECT SpanId, ParentSpanId, ServiceName FROM %s WHERE "__time" >= '%s' AND "__time" <= '%s' ORDER BY __time DESC`, constants.DruidDatasource, query.StartTime, query.EndTime)
|
||||
|
||||
// zap.S().Debug(sqlQuery)
|
||||
|
||||
|
@ -181,7 +181,7 @@ func buildFiltersForSpansAggregates(queryParams *model.SpanSearchAggregatesParam
|
||||
|
||||
}
|
||||
|
||||
func SearchTraces(client *godruid.Client, traceId string) ([]godruid.ScanResult, error) {
|
||||
func SearchTraces(client *godruid.Client, traceId string) (*[]model.SearchSpansResult, error) {
|
||||
|
||||
filter := godruid.FilterSelector("TraceId", traceId)
|
||||
|
||||
@ -206,7 +206,17 @@ func SearchTraces(client *godruid.Client, traceId string) ([]godruid.ScanResult,
|
||||
|
||||
// fmt.Printf("query.QueryResult:\n%v", query.QueryResult)
|
||||
|
||||
return query.QueryResult, nil
|
||||
var searchSpansResult []model.SearchSpansResult
|
||||
searchSpansResult = make([]model.SearchSpansResult, len(query.QueryResult))
|
||||
|
||||
searchSpansResult[0].Columns = make([]string, len(query.QueryResult[0].Columns))
|
||||
copy(searchSpansResult[0].Columns, query.QueryResult[0].Columns)
|
||||
|
||||
searchSpansResult[0].Events = make([][]interface{}, len(query.QueryResult[0].Events))
|
||||
copy(searchSpansResult[0].Events, query.QueryResult[0].Events)
|
||||
|
||||
return &searchSpansResult, nil
|
||||
|
||||
}
|
||||
|
||||
func SearchSpansAggregate(client *godruid.Client, queryParams *model.SpanSearchAggregatesParams) ([]SpanSearchAggregatesResponseItem, error) {
|
||||
|
@ -9,6 +9,8 @@ type GetTopEndpointsParams struct {
|
||||
StartTime string
|
||||
EndTime string
|
||||
ServiceName string
|
||||
Start *time.Time
|
||||
End *time.Time
|
||||
}
|
||||
|
||||
type GetUsageParams struct {
|
||||
@ -16,6 +18,9 @@ type GetUsageParams struct {
|
||||
EndTime string
|
||||
ServiceName string
|
||||
Period string
|
||||
StepHour int
|
||||
Start *time.Time
|
||||
End *time.Time
|
||||
}
|
||||
|
||||
type GetServicesParams struct {
|
||||
|
@ -1,6 +1,8 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
@ -45,7 +47,18 @@ type SearchSpansResult struct {
|
||||
Columns []string `json:"columns"`
|
||||
Events [][]interface{} `json:"events"`
|
||||
}
|
||||
type SearchSpanReponseItem struct {
|
||||
|
||||
type TraceResult struct {
|
||||
Data []interface{} `json:"data" db:"data"`
|
||||
Total int `json:"total" db:"total"`
|
||||
Limit int `json:"limit" db:"limit"`
|
||||
Offset int `json:"offset" db:"offset"`
|
||||
}
|
||||
type TraceResultItem struct {
|
||||
TraceID string
|
||||
Spans []TraceResultSpan
|
||||
}
|
||||
type TraceResultSpan struct {
|
||||
Timestamp string `db:"timestamp"`
|
||||
SpanID string `db:"spanID"`
|
||||
TraceID string `db:"traceID"`
|
||||
@ -57,11 +70,44 @@ type SearchSpanReponseItem struct {
|
||||
TagsValues []string `db:"tagsValues"`
|
||||
}
|
||||
|
||||
type SearchSpanReponseItem struct {
|
||||
Timestamp string `db:"timestamp"`
|
||||
SpanID string `db:"spanID"`
|
||||
TraceID string `db:"traceID"`
|
||||
ServiceName string `db:"serviceName"`
|
||||
Name string `db:"name"`
|
||||
Kind int32 `db:"kind"`
|
||||
References string `db:"references,omitempty"`
|
||||
DurationNano int64 `db:"durationNano"`
|
||||
TagsKeys []string `db:"tagsKeys"`
|
||||
TagsValues []string `db:"tagsValues"`
|
||||
}
|
||||
|
||||
type OtelSpanRef struct {
|
||||
TraceId string `json:"traceId,omitempty"`
|
||||
SpanId string `json:"spanId,omitempty"`
|
||||
RefType string `json:"refType,omitempty"`
|
||||
}
|
||||
|
||||
func (ref *OtelSpanRef) toString() string {
|
||||
|
||||
retString := fmt.Sprintf(`{TraceId=%s, SpanId=%s, RefType=%s}`, ref.TraceId, ref.SpanId, ref.RefType)
|
||||
|
||||
return retString
|
||||
}
|
||||
|
||||
func (item *SearchSpanReponseItem) GetValues() []interface{} {
|
||||
|
||||
timeObj, _ := time.Parse(time.RFC3339Nano, item.Timestamp)
|
||||
references := []OtelSpanRef{}
|
||||
json.Unmarshal([]byte(item.References), &references)
|
||||
|
||||
returnArray := []interface{}{int64(timeObj.UnixNano() / 1000000), item.SpanID, item.TraceID, item.ServiceName, item.Name, strconv.Itoa(int(item.Kind)), strconv.FormatInt(item.DurationNano, 10), item.TagsKeys, item.TagsValues}
|
||||
referencesStringArray := []string{}
|
||||
for _, item := range references {
|
||||
referencesStringArray = append(referencesStringArray, item.toString())
|
||||
}
|
||||
|
||||
returnArray := []interface{}{int64(timeObj.UnixNano() / 1000000), item.SpanID, item.TraceID, item.ServiceName, item.Name, strconv.Itoa(int(item.Kind)), strconv.FormatInt(item.DurationNano, 10), item.TagsKeys, item.TagsValues, referencesStringArray}
|
||||
|
||||
return returnArray
|
||||
}
|
||||
@ -87,32 +133,32 @@ type ServiceDBOverviewItem struct {
|
||||
}
|
||||
|
||||
type ServiceMapDependencyItem struct {
|
||||
SpanId string `json:"spanId,omitempty"`
|
||||
ParentSpanId string `json:"parentSpanId,omitempty"`
|
||||
ServiceName string `json:"serviceName,omitempty"`
|
||||
SpanId string `json:"spanId,omitempty" db:"spanID,omitempty"`
|
||||
ParentSpanId string `json:"parentSpanId,omitempty" db:"parentSpanID,omitempty"`
|
||||
ServiceName string `json:"serviceName,omitempty" db:"serviceName,omitempty"`
|
||||
}
|
||||
|
||||
type UsageItem struct {
|
||||
Time string `json:"time,omitempty"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Count int64 `json:"count"`
|
||||
Time string `json:"time,omitempty" db:"time,omitempty"`
|
||||
Timestamp int64 `json:"timestamp" db:"timestamp"`
|
||||
Count int64 `json:"count" db:"count"`
|
||||
}
|
||||
|
||||
type TopEnpointsItem struct {
|
||||
Percentile50 float32 `json:"p50"`
|
||||
Percentile90 float32 `json:"p90"`
|
||||
Percentile99 float32 `json:"p99"`
|
||||
NumCalls int `json:"numCalls"`
|
||||
Name string `json:"name"`
|
||||
type TopEndpointsItem struct {
|
||||
Percentile50 float32 `json:"p50" db:"p50"`
|
||||
Percentile95 float32 `json:"p95" db:"p95"`
|
||||
Percentile99 float32 `json:"p99" db:"p99"`
|
||||
NumCalls int `json:"numCalls" db:"numCalls"`
|
||||
Name string `json:"name" db:"name"`
|
||||
}
|
||||
|
||||
type TagItem struct {
|
||||
TagKeys string `json:"tagKeys"`
|
||||
TagCount int `json:"tagCount"`
|
||||
TagKeys string `json:"tagKeys" db:"tagKeys"`
|
||||
TagCount int `json:"tagCount" db:"tagCount"`
|
||||
}
|
||||
|
||||
type ServiceMapDependencyResponseItem struct {
|
||||
Parent string `json:"parent,omitempty"`
|
||||
Child string `json:"child,omitempty"`
|
||||
CallCount int `json:"callCount,omitempty"`
|
||||
Parent string `json:"parent,omitempty" db:"parent,omitempty"`
|
||||
Child string `json:"child,omitempty" db:"child,omitempty"`
|
||||
CallCount int `json:"callCount,omitempty" db:"callCount,omitempty"`
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user