mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-11 15:39:13 +08:00
Merge pull request #144 from SigNoz/query_refactor
Query Service refactor to add interface for APIs
This commit is contained in:
commit
d891c3e118
10
node_modules/.yarn-integrity
generated
vendored
Normal file
10
node_modules/.yarn-integrity
generated
vendored
Normal file
@ -0,0 +1,10 @@
|
||||
{
|
||||
"systemParams": "darwin-x64-83",
|
||||
"modulesFolders": [],
|
||||
"flags": [],
|
||||
"linkedModules": [],
|
||||
"topLevelPatterns": [],
|
||||
"lockfileEntries": {},
|
||||
"files": [],
|
||||
"artifacts": {}
|
||||
}
|
Binary file not shown.
124
pkg/query-service/app/clickhouseReader/options.go
Normal file
124
pkg/query-service/app/clickhouseReader/options.go
Normal file
@ -0,0 +1,124 @@
|
||||
package clickhouseReader
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
type Encoding string
|
||||
|
||||
const (
|
||||
// EncodingJSON is used for spans encoded as JSON.
|
||||
EncodingJSON Encoding = "json"
|
||||
// EncodingProto is used for spans encoded as Protobuf.
|
||||
EncodingProto Encoding = "protobuf"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultDatasource string = "tcp://localhost:9000"
|
||||
defaultOperationsTable string = "signoz_operations"
|
||||
defaultIndexTable string = "signoz_index"
|
||||
defaultSpansTable string = "signoz_spans"
|
||||
defaultArchiveSpansTable string = "signoz_archive_spans"
|
||||
defaultWriteBatchDelay time.Duration = 5 * time.Second
|
||||
defaultWriteBatchSize int = 10000
|
||||
defaultEncoding Encoding = EncodingJSON
|
||||
)
|
||||
|
||||
const (
|
||||
suffixEnabled = ".enabled"
|
||||
suffixDatasource = ".datasource"
|
||||
suffixOperationsTable = ".operations-table"
|
||||
suffixIndexTable = ".index-table"
|
||||
suffixSpansTable = ".spans-table"
|
||||
suffixWriteBatchDelay = ".write-batch-delay"
|
||||
suffixWriteBatchSize = ".write-batch-size"
|
||||
suffixEncoding = ".encoding"
|
||||
)
|
||||
|
||||
// NamespaceConfig is Clickhouse's internal configuration data
|
||||
type namespaceConfig struct {
|
||||
namespace string
|
||||
Enabled bool
|
||||
Datasource string
|
||||
OperationsTable string
|
||||
IndexTable string
|
||||
SpansTable string
|
||||
WriteBatchDelay time.Duration
|
||||
WriteBatchSize int
|
||||
Encoding Encoding
|
||||
Connector Connector
|
||||
}
|
||||
|
||||
// Connecto defines how to connect to the database
|
||||
type Connector func(cfg *namespaceConfig) (*sqlx.DB, error)
|
||||
|
||||
func defaultConnector(cfg *namespaceConfig) (*sqlx.DB, error) {
|
||||
db, err := sqlx.Open("clickhouse", cfg.Datasource)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := db.Ping(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
// Options store storage plugin related configs
|
||||
type Options struct {
|
||||
primary *namespaceConfig
|
||||
|
||||
others map[string]*namespaceConfig
|
||||
}
|
||||
|
||||
// NewOptions creates a new Options struct.
|
||||
func NewOptions(datasource string, primaryNamespace string, otherNamespaces ...string) *Options {
|
||||
|
||||
if datasource == "" {
|
||||
datasource = defaultDatasource
|
||||
}
|
||||
|
||||
options := &Options{
|
||||
primary: &namespaceConfig{
|
||||
namespace: primaryNamespace,
|
||||
Enabled: true,
|
||||
Datasource: datasource,
|
||||
OperationsTable: defaultOperationsTable,
|
||||
IndexTable: defaultIndexTable,
|
||||
SpansTable: defaultSpansTable,
|
||||
WriteBatchDelay: defaultWriteBatchDelay,
|
||||
WriteBatchSize: defaultWriteBatchSize,
|
||||
Encoding: defaultEncoding,
|
||||
Connector: defaultConnector,
|
||||
},
|
||||
others: make(map[string]*namespaceConfig, len(otherNamespaces)),
|
||||
}
|
||||
|
||||
for _, namespace := range otherNamespaces {
|
||||
if namespace == archiveNamespace {
|
||||
options.others[namespace] = &namespaceConfig{
|
||||
namespace: namespace,
|
||||
Datasource: datasource,
|
||||
OperationsTable: "",
|
||||
IndexTable: "",
|
||||
SpansTable: defaultArchiveSpansTable,
|
||||
WriteBatchDelay: defaultWriteBatchDelay,
|
||||
WriteBatchSize: defaultWriteBatchSize,
|
||||
Encoding: defaultEncoding,
|
||||
Connector: defaultConnector,
|
||||
}
|
||||
} else {
|
||||
options.others[namespace] = &namespaceConfig{namespace: namespace}
|
||||
}
|
||||
}
|
||||
|
||||
return options
|
||||
}
|
||||
|
||||
// GetPrimary returns the primary namespace configuration
|
||||
func (opt *Options) getPrimary() *namespaceConfig {
|
||||
return opt.primary
|
||||
}
|
619
pkg/query-service/app/clickhouseReader/reader.go
Normal file
619
pkg/query-service/app/clickhouseReader/reader.go
Normal file
@ -0,0 +1,619 @@
|
||||
package clickhouseReader
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
_ "github.com/ClickHouse/clickhouse-go"
|
||||
"github.com/jmoiron/sqlx"
|
||||
|
||||
"go.signoz.io/query-service/model"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
primaryNamespace = "clickhouse"
|
||||
archiveNamespace = "clickhouse-archive"
|
||||
|
||||
minTimespanForProgressiveSearch = time.Hour
|
||||
minTimespanForProgressiveSearchMargin = time.Minute
|
||||
maxProgressiveSteps = 4
|
||||
)
|
||||
|
||||
var (
|
||||
ErrNoOperationsTable = errors.New("no operations table supplied")
|
||||
ErrNoIndexTable = errors.New("no index table supplied")
|
||||
ErrStartTimeRequired = errors.New("start time is required for search queries")
|
||||
)
|
||||
|
||||
// SpanWriter for reading spans from ClickHouse
|
||||
type ClickHouseReader struct {
|
||||
db *sqlx.DB
|
||||
operationsTable string
|
||||
indexTable string
|
||||
spansTable string
|
||||
}
|
||||
|
||||
// NewTraceReader returns a TraceReader for the database
|
||||
func NewReader() *ClickHouseReader {
|
||||
|
||||
datasource := os.Getenv("ClickHouseUrl")
|
||||
options := NewOptions(datasource, primaryNamespace, archiveNamespace)
|
||||
db, err := initialize(options)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Error(err)
|
||||
}
|
||||
return &ClickHouseReader{
|
||||
db: db,
|
||||
operationsTable: options.primary.OperationsTable,
|
||||
indexTable: options.primary.IndexTable,
|
||||
spansTable: options.primary.SpansTable,
|
||||
}
|
||||
}
|
||||
|
||||
func initialize(options *Options) (*sqlx.DB, error) {
|
||||
|
||||
db, err := connect(options.getPrimary())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error connecting to primary db: %v", err)
|
||||
}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
func connect(cfg *namespaceConfig) (*sqlx.DB, error) {
|
||||
if cfg.Encoding != EncodingJSON && cfg.Encoding != EncodingProto {
|
||||
return nil, fmt.Errorf("unknown encoding %q, supported: %q, %q", cfg.Encoding, EncodingJSON, EncodingProto)
|
||||
}
|
||||
|
||||
return cfg.Connector(cfg)
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, error) {
|
||||
|
||||
if r.indexTable == "" {
|
||||
return nil, ErrNoIndexTable
|
||||
}
|
||||
|
||||
serviceItems := []model.ServiceItem{}
|
||||
|
||||
query := fmt.Sprintf("SELECT serviceName, quantile(0.99)(durationNano) as p99, avg(durationNano) as avgDuration, count(*) as numCalls FROM %s WHERE timestamp>='%s' AND timestamp<='%s' AND kind='2' GROUP BY serviceName ORDER BY p99 DESC", r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
|
||||
|
||||
err := r.db.Select(&serviceItems, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
////////////////// Below block gets 5xx of services
|
||||
serviceErrorItems := []model.ServiceItem{}
|
||||
|
||||
query = fmt.Sprintf("SELECT serviceName, count(*) as numErrors FROM %s WHERE timestamp>='%s' AND timestamp<='%s' AND kind='2' AND statusCode>=500 GROUP BY serviceName", r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
|
||||
|
||||
err = r.db.Select(&serviceErrorItems, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
m5xx := make(map[string]int)
|
||||
|
||||
for j, _ := range serviceErrorItems {
|
||||
m5xx[serviceErrorItems[j].ServiceName] = serviceErrorItems[j].NumErrors
|
||||
}
|
||||
///////////////////////////////////////////
|
||||
|
||||
////////////////// Below block gets 4xx of services
|
||||
|
||||
service4xxItems := []model.ServiceItem{}
|
||||
|
||||
query = fmt.Sprintf("SELECT serviceName, count(*) as num4xx FROM %s WHERE timestamp>='%s' AND timestamp<='%s' AND kind='2' AND statusCode>=400 AND statusCode<500 GROUP BY serviceName", r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
|
||||
|
||||
err = r.db.Select(&service4xxItems, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
m4xx := make(map[string]int)
|
||||
|
||||
for j, _ := range service4xxItems {
|
||||
m5xx[service4xxItems[j].ServiceName] = service4xxItems[j].Num4XX
|
||||
}
|
||||
|
||||
for i, _ := range serviceItems {
|
||||
if val, ok := m5xx[serviceItems[i].ServiceName]; ok {
|
||||
serviceItems[i].NumErrors = val
|
||||
}
|
||||
if val, ok := m4xx[serviceItems[i].ServiceName]; ok {
|
||||
serviceItems[i].Num4XX = val
|
||||
}
|
||||
serviceItems[i].CallRate = float32(serviceItems[i].NumCalls) / float32(queryParams.Period)
|
||||
serviceItems[i].FourXXRate = float32(serviceItems[i].Num4XX) / float32(queryParams.Period)
|
||||
serviceItems[i].ErrorRate = float32(serviceItems[i].NumErrors) / float32(queryParams.Period)
|
||||
}
|
||||
|
||||
return &serviceItems, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceOverviewItem, error) {
|
||||
|
||||
serviceOverviewItems := []model.ServiceOverviewItem{}
|
||||
|
||||
query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, quantile(0.99)(durationNano) as p99, quantile(0.95)(durationNano) as p95,quantile(0.50)(durationNano) as p50, count(*) as numCalls FROM %s WHERE timestamp>='%s' AND timestamp<='%s' AND kind='2' AND serviceName='%s' GROUP BY time ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10), queryParams.ServiceName)
|
||||
|
||||
err := r.db.Select(&serviceOverviewItems, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
serviceErrorItems := []model.ServiceErrorItem{}
|
||||
|
||||
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, count(*) as numErrors FROM %s WHERE timestamp>='%s' AND timestamp<='%s' AND kind='2' AND serviceName='%s' AND statusCode>=500 GROUP BY time ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10), queryParams.ServiceName)
|
||||
|
||||
err = r.db.Select(&serviceErrorItems, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
m := make(map[int64]int)
|
||||
|
||||
for j, _ := range serviceErrorItems {
|
||||
timeObj, _ := time.Parse(time.RFC3339Nano, serviceErrorItems[j].Time)
|
||||
m[int64(timeObj.UnixNano())] = serviceErrorItems[j].NumErrors
|
||||
}
|
||||
|
||||
for i, _ := range serviceOverviewItems {
|
||||
timeObj, _ := time.Parse(time.RFC3339Nano, serviceOverviewItems[i].Time)
|
||||
serviceOverviewItems[i].Timestamp = int64(timeObj.UnixNano())
|
||||
serviceOverviewItems[i].Time = ""
|
||||
|
||||
if val, ok := m[serviceOverviewItems[i].Timestamp]; ok {
|
||||
serviceOverviewItems[i].NumErrors = val
|
||||
}
|
||||
serviceOverviewItems[i].ErrorRate = float32(serviceOverviewItems[i].NumErrors) * 100 / float32(serviceOverviewItems[i].NumCalls)
|
||||
serviceOverviewItems[i].CallRate = float32(serviceOverviewItems[i].NumCalls) / float32(queryParams.StepSeconds)
|
||||
}
|
||||
|
||||
return &serviceOverviewItems, nil
|
||||
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) SearchSpans(ctx context.Context, queryParams *model.SpanSearchParams) (*[]model.SearchSpansResult, error) {
|
||||
|
||||
query := fmt.Sprintf("SELECT timestamp, spanID, traceID, serviceName, name, kind, durationNano, tagsKeys, tagsValues FROM %s WHERE timestamp >= ? AND timestamp <= ?", r.indexTable)
|
||||
|
||||
args := []interface{}{strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)}
|
||||
|
||||
if len(queryParams.ServiceName) != 0 {
|
||||
query = query + " AND serviceName = ?"
|
||||
args = append(args, queryParams.ServiceName)
|
||||
}
|
||||
|
||||
if len(queryParams.OperationName) != 0 {
|
||||
|
||||
query = query + " AND name = ?"
|
||||
args = append(args, queryParams.OperationName)
|
||||
|
||||
}
|
||||
|
||||
if len(queryParams.Kind) != 0 {
|
||||
query = query + " AND kind = ?"
|
||||
args = append(args, queryParams.Kind)
|
||||
|
||||
}
|
||||
|
||||
if len(queryParams.MinDuration) != 0 {
|
||||
query = query + " AND durationNano >= ?"
|
||||
args = append(args, queryParams.MinDuration)
|
||||
}
|
||||
if len(queryParams.MaxDuration) != 0 {
|
||||
query = query + " AND durationNano <= ?"
|
||||
args = append(args, queryParams.MaxDuration)
|
||||
}
|
||||
|
||||
for _, item := range queryParams.Tags {
|
||||
|
||||
if item.Key == "error" && item.Value == "true" {
|
||||
query = query + " AND ( has(tags, 'error:true') OR statusCode>=500)"
|
||||
continue
|
||||
}
|
||||
|
||||
if item.Operator == "equals" {
|
||||
query = query + " AND has(tags, ?)"
|
||||
args = append(args, fmt.Sprintf("%s:%s", item.Key, item.Value))
|
||||
|
||||
} else if item.Operator == "contains" {
|
||||
query = query + " AND tagsValues[indexOf(tagsKeys, ?)] ILIKE ?"
|
||||
args = append(args, item.Key)
|
||||
args = append(args, fmt.Sprintf("%%%s%%", item.Value))
|
||||
} else if item.Operator == "isnotnull" {
|
||||
query = query + " AND has(tagsKeys, ?)"
|
||||
args = append(args, item.Key)
|
||||
} else {
|
||||
return nil, fmt.Errorf("Tag Operator %s not supported", item.Operator)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
query = query + " ORDER BY timestamp DESC LIMIT 100"
|
||||
|
||||
var searchScanReponses []model.SearchSpanReponseItem
|
||||
|
||||
err := r.db.Select(&searchScanReponses, query, args...)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
searchSpansResult := []model.SearchSpansResult{
|
||||
model.SearchSpansResult{
|
||||
Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues"},
|
||||
Events: make([][]interface{}, len(searchScanReponses)),
|
||||
},
|
||||
}
|
||||
|
||||
for i, item := range searchScanReponses {
|
||||
spanEvents := item.GetValues()
|
||||
searchSpansResult[0].Events[i] = spanEvents
|
||||
}
|
||||
|
||||
return &searchSpansResult, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetServiceDBOverview(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceDBOverviewItem, error) {
|
||||
|
||||
var serviceDBOverviewItems []model.ServiceDBOverviewItem
|
||||
|
||||
query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, avg(durationNano) as avgDuration, count(1) as numCalls, dbSystem FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' AND kind='3' AND dbName IS NOT NULL GROUP BY time, dbSystem ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
|
||||
|
||||
err := r.db.Select(&serviceDBOverviewItems, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
for i, _ := range serviceDBOverviewItems {
|
||||
timeObj, _ := time.Parse(time.RFC3339Nano, serviceDBOverviewItems[i].Time)
|
||||
serviceDBOverviewItems[i].Timestamp = int64(timeObj.UnixNano())
|
||||
serviceDBOverviewItems[i].Time = ""
|
||||
serviceDBOverviewItems[i].CallRate = float32(serviceDBOverviewItems[i].NumCalls) / float32(queryParams.StepSeconds)
|
||||
}
|
||||
|
||||
if serviceDBOverviewItems == nil {
|
||||
serviceDBOverviewItems = []model.ServiceDBOverviewItem{}
|
||||
}
|
||||
|
||||
return &serviceDBOverviewItems, nil
|
||||
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetServiceExternalAvgDuration(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) {
|
||||
|
||||
var serviceExternalItems []model.ServiceExternalItem
|
||||
|
||||
query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, avg(durationNano) as avgDuration FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' AND kind='3' AND externalHttpUrl IS NOT NULL GROUP BY time ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
|
||||
|
||||
err := r.db.Select(&serviceExternalItems, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
for i, _ := range serviceExternalItems {
|
||||
timeObj, _ := time.Parse(time.RFC3339Nano, serviceExternalItems[i].Time)
|
||||
serviceExternalItems[i].Timestamp = int64(timeObj.UnixNano())
|
||||
serviceExternalItems[i].Time = ""
|
||||
serviceExternalItems[i].CallRate = float32(serviceExternalItems[i].NumCalls) / float32(queryParams.StepSeconds)
|
||||
}
|
||||
|
||||
if serviceExternalItems == nil {
|
||||
serviceExternalItems = []model.ServiceExternalItem{}
|
||||
}
|
||||
|
||||
return &serviceExternalItems, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetServiceExternalErrors(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) {
|
||||
|
||||
var serviceExternalErrorItems []model.ServiceExternalItem
|
||||
|
||||
query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, avg(durationNano) as avgDuration, count(1) as numCalls, externalHttpUrl FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' AND kind='3' AND externalHttpUrl IS NOT NULL AND statusCode >= 500 GROUP BY time, externalHttpUrl ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
|
||||
|
||||
err := r.db.Select(&serviceExternalErrorItems, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
var serviceExternalTotalItems []model.ServiceExternalItem
|
||||
|
||||
queryTotal := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, avg(durationNano) as avgDuration, count(1) as numCalls, externalHttpUrl FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' AND kind='3' AND externalHttpUrl IS NOT NULL GROUP BY time, externalHttpUrl ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
|
||||
|
||||
errTotal := r.db.Select(&serviceExternalTotalItems, queryTotal)
|
||||
|
||||
if errTotal != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
m := make(map[string]int)
|
||||
|
||||
for j, _ := range serviceExternalErrorItems {
|
||||
timeObj, _ := time.Parse(time.RFC3339Nano, serviceExternalErrorItems[j].Time)
|
||||
m[strconv.FormatInt(timeObj.UnixNano(), 10)+"-"+serviceExternalErrorItems[j].ExternalHttpUrl] = serviceExternalErrorItems[j].NumCalls
|
||||
}
|
||||
|
||||
for i, _ := range serviceExternalTotalItems {
|
||||
timeObj, _ := time.Parse(time.RFC3339Nano, serviceExternalTotalItems[i].Time)
|
||||
serviceExternalTotalItems[i].Timestamp = int64(timeObj.UnixNano())
|
||||
serviceExternalTotalItems[i].Time = ""
|
||||
// serviceExternalTotalItems[i].CallRate = float32(serviceExternalTotalItems[i].NumCalls) / float32(queryParams.StepSeconds)
|
||||
|
||||
if val, ok := m[strconv.FormatInt(serviceExternalTotalItems[i].Timestamp, 10)+"-"+serviceExternalTotalItems[i].ExternalHttpUrl]; ok {
|
||||
serviceExternalTotalItems[i].NumErrors = val
|
||||
serviceExternalTotalItems[i].ErrorRate = float32(serviceExternalTotalItems[i].NumErrors) * 100 / float32(serviceExternalTotalItems[i].NumCalls)
|
||||
}
|
||||
serviceExternalTotalItems[i].CallRate = 0
|
||||
serviceExternalTotalItems[i].NumCalls = 0
|
||||
|
||||
}
|
||||
|
||||
if serviceExternalTotalItems == nil {
|
||||
serviceExternalTotalItems = []model.ServiceExternalItem{}
|
||||
}
|
||||
|
||||
return &serviceExternalTotalItems, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetServiceExternal(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) {
|
||||
|
||||
var serviceExternalItems []model.ServiceExternalItem
|
||||
|
||||
query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, avg(durationNano) as avgDuration, count(1) as numCalls, externalHttpUrl FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' AND kind='3' AND externalHttpUrl IS NOT NULL GROUP BY time, externalHttpUrl ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
|
||||
|
||||
err := r.db.Select(&serviceExternalItems, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
for i, _ := range serviceExternalItems {
|
||||
timeObj, _ := time.Parse(time.RFC3339Nano, serviceExternalItems[i].Time)
|
||||
serviceExternalItems[i].Timestamp = int64(timeObj.UnixNano())
|
||||
serviceExternalItems[i].Time = ""
|
||||
serviceExternalItems[i].CallRate = float32(serviceExternalItems[i].NumCalls) / float32(queryParams.StepSeconds)
|
||||
}
|
||||
|
||||
if serviceExternalItems == nil {
|
||||
serviceExternalItems = []model.ServiceExternalItem{}
|
||||
}
|
||||
|
||||
return &serviceExternalItems, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetTopEndpoints(ctx context.Context, queryParams *model.GetTopEndpointsParams) (*[]model.TopEndpointsItem, error) {
|
||||
|
||||
var topEndpointsItems []model.TopEndpointsItem
|
||||
|
||||
query := fmt.Sprintf("SELECT quantile(0.5)(durationNano) as p50, quantile(0.95)(durationNano) as p95, quantile(0.99)(durationNano) as p99, COUNT(1) as numCalls, name FROM %s WHERE timestamp >= '%s' AND timestamp <= '%s' AND kind='2' and serviceName='%s' GROUP BY name", r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10), queryParams.ServiceName)
|
||||
|
||||
err := r.db.Select(&topEndpointsItems, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
if topEndpointsItems == nil {
|
||||
topEndpointsItems = []model.TopEndpointsItem{}
|
||||
}
|
||||
|
||||
return &topEndpointsItems, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetUsage(ctx context.Context, queryParams *model.GetUsageParams) (*[]model.UsageItem, error) {
|
||||
|
||||
var usageItems []model.UsageItem
|
||||
|
||||
var query string
|
||||
if len(queryParams.ServiceName) != 0 {
|
||||
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d HOUR) as time, count(1) as count FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' GROUP BY time ORDER BY time ASC", queryParams.StepHour, r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
|
||||
} else {
|
||||
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d HOUR) as time, count(1) as count FROM %s WHERE timestamp>='%s' AND timestamp<='%s' GROUP BY time ORDER BY time ASC", queryParams.StepHour, r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
|
||||
}
|
||||
|
||||
err := r.db.Select(&usageItems, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
for i, _ := range usageItems {
|
||||
timeObj, _ := time.Parse(time.RFC3339Nano, usageItems[i].Time)
|
||||
usageItems[i].Timestamp = int64(timeObj.UnixNano())
|
||||
usageItems[i].Time = ""
|
||||
}
|
||||
|
||||
if usageItems == nil {
|
||||
usageItems = []model.UsageItem{}
|
||||
}
|
||||
|
||||
return &usageItems, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetServicesList(ctx context.Context) (*[]string, error) {
|
||||
|
||||
services := []string{}
|
||||
|
||||
query := fmt.Sprintf(`SELECT DISTINCT serviceName FROM %s WHERE toDate(timestamp) > now() - INTERVAL 1 DAY`, r.indexTable)
|
||||
|
||||
err := r.db.Select(&services, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
return &services, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetTags(ctx context.Context, serviceName string) (*[]model.TagItem, error) {
|
||||
|
||||
tagItems := []model.TagItem{}
|
||||
|
||||
query := fmt.Sprintf(`SELECT DISTINCT arrayJoin(tagsKeys) as tagKeys FROM %s WHERE serviceName='%s' AND toDate(timestamp) > now() - INTERVAL 1 DAY`, r.indexTable, serviceName)
|
||||
|
||||
err := r.db.Select(&tagItems, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
return &tagItems, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) GetOperations(ctx context.Context, serviceName string) (*[]string, error) {
|
||||
|
||||
operations := []string{}
|
||||
|
||||
query := fmt.Sprintf(`SELECT DISTINCT(name) FROM %s WHERE serviceName='%s' AND toDate(timestamp) > now() - INTERVAL 1 DAY`, r.indexTable, serviceName)
|
||||
|
||||
err := r.db.Select(&operations, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
return &operations, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) SearchTraces(ctx context.Context, traceId string) (*[]model.SearchSpansResult, error) {
|
||||
|
||||
var searchScanReponses []model.SearchSpanReponseItem
|
||||
|
||||
query := fmt.Sprintf("SELECT timestamp, spanID, traceID, serviceName, name, kind, durationNano, tagsKeys, tagsValues, references FROM %s WHERE traceID='%s'", r.indexTable, traceId)
|
||||
|
||||
err := r.db.Select(&searchScanReponses, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
searchSpansResult := []model.SearchSpansResult{
|
||||
model.SearchSpansResult{
|
||||
Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References"},
|
||||
Events: make([][]interface{}, len(searchScanReponses)),
|
||||
},
|
||||
}
|
||||
|
||||
for i, item := range searchScanReponses {
|
||||
spanEvents := item.GetValues()
|
||||
searchSpansResult[0].Events[i] = spanEvents
|
||||
}
|
||||
|
||||
return &searchSpansResult, nil
|
||||
|
||||
}
|
||||
func (r *ClickHouseReader) GetServiceMapDependencies(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) {
|
||||
serviceMapDependencyItems := []model.ServiceMapDependencyItem{}
|
||||
|
||||
query := fmt.Sprintf(`SELECT spanID, parentSpanID, serviceName FROM %s WHERE timestamp>='%s' AND timestamp<='%s'`, r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
|
||||
|
||||
err := r.db.Select(&serviceMapDependencyItems, query)
|
||||
|
||||
zap.S().Info(query)
|
||||
|
||||
if err != nil {
|
||||
zap.S().Debug("Error in processing sql query: ", err)
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
}
|
||||
|
||||
serviceMap := make(map[string]*model.ServiceMapDependencyResponseItem)
|
||||
|
||||
spanId2ServiceNameMap := make(map[string]string)
|
||||
for i, _ := range serviceMapDependencyItems {
|
||||
spanId2ServiceNameMap[serviceMapDependencyItems[i].SpanId] = serviceMapDependencyItems[i].ServiceName
|
||||
}
|
||||
for i, _ := range serviceMapDependencyItems {
|
||||
parent2childServiceName := spanId2ServiceNameMap[serviceMapDependencyItems[i].ParentSpanId] + "-" + spanId2ServiceNameMap[serviceMapDependencyItems[i].SpanId]
|
||||
if _, ok := serviceMap[parent2childServiceName]; !ok {
|
||||
serviceMap[parent2childServiceName] = &model.ServiceMapDependencyResponseItem{
|
||||
Parent: spanId2ServiceNameMap[serviceMapDependencyItems[i].ParentSpanId],
|
||||
Child: spanId2ServiceNameMap[serviceMapDependencyItems[i].SpanId],
|
||||
CallCount: 1,
|
||||
}
|
||||
} else {
|
||||
serviceMap[parent2childServiceName].CallCount++
|
||||
}
|
||||
}
|
||||
|
||||
retMe := make([]model.ServiceMapDependencyResponseItem, 0, len(serviceMap))
|
||||
for _, dependency := range serviceMap {
|
||||
if dependency.Parent == "" {
|
||||
continue
|
||||
}
|
||||
retMe = append(retMe, *dependency)
|
||||
}
|
||||
|
||||
return &retMe, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) SearchSpansAggregate(ctx context.Context, queryParams *model.SpanSearchAggregatesParams) ([]model.SpanSearchAggregatesResponseItem, error) {
|
||||
|
||||
spanSearchAggregatesResponseItems := []model.SpanSearchAggregatesResponseItem{}
|
||||
|
||||
return spanSearchAggregatesResponseItems, nil
|
||||
|
||||
}
|
99
pkg/query-service/app/druidReader/reader.go
Normal file
99
pkg/query-service/app/druidReader/reader.go
Normal file
@ -0,0 +1,99 @@
|
||||
package druidReader
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
|
||||
"go.signoz.io/query-service/druidQuery"
|
||||
"go.signoz.io/query-service/godruid"
|
||||
"go.signoz.io/query-service/model"
|
||||
)
|
||||
|
||||
type DruidReader struct {
|
||||
Client *godruid.Client
|
||||
SqlClient *druidQuery.SqlClient
|
||||
}
|
||||
|
||||
func NewReader() *DruidReader {
|
||||
|
||||
initialize()
|
||||
druidClientUrl := os.Getenv("DruidClientUrl")
|
||||
|
||||
client := godruid.Client{
|
||||
Url: druidClientUrl,
|
||||
Debug: true,
|
||||
}
|
||||
|
||||
sqlClient := druidQuery.SqlClient{
|
||||
Url: druidClientUrl,
|
||||
Debug: true,
|
||||
}
|
||||
return &DruidReader{
|
||||
Client: &client,
|
||||
SqlClient: &sqlClient,
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func initialize() {
|
||||
|
||||
}
|
||||
|
||||
func (druid *DruidReader) GetServiceOverview(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceOverviewItem, error) {
|
||||
return druidQuery.GetServiceOverview(druid.SqlClient, query)
|
||||
}
|
||||
|
||||
func (druid *DruidReader) GetServices(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceItem, error) {
|
||||
return druidQuery.GetServices(druid.SqlClient, query)
|
||||
}
|
||||
|
||||
func (druid *DruidReader) SearchSpans(ctx context.Context, query *model.SpanSearchParams) (*[]model.SearchSpansResult, error) {
|
||||
return druidQuery.SearchSpans(druid.Client, query)
|
||||
}
|
||||
|
||||
func (druid *DruidReader) GetServiceDBOverview(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceDBOverviewItem, error) {
|
||||
return druidQuery.GetServiceDBOverview(druid.SqlClient, query)
|
||||
}
|
||||
|
||||
func (druid *DruidReader) GetServiceExternalAvgDuration(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) {
|
||||
return druidQuery.GetServiceExternalAvgDuration(druid.SqlClient, query)
|
||||
}
|
||||
|
||||
func (druid *DruidReader) GetServiceExternalErrors(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) {
|
||||
return druidQuery.GetServiceExternalErrors(druid.SqlClient, query)
|
||||
}
|
||||
|
||||
func (druid *DruidReader) GetServiceExternal(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) {
|
||||
return druidQuery.GetServiceExternal(druid.SqlClient, query)
|
||||
}
|
||||
|
||||
func (druid *DruidReader) GetTopEndpoints(ctx context.Context, query *model.GetTopEndpointsParams) (*[]model.TopEndpointsItem, error) {
|
||||
return druidQuery.GetTopEndpoints(druid.SqlClient, query)
|
||||
}
|
||||
|
||||
func (druid *DruidReader) GetUsage(ctx context.Context, query *model.GetUsageParams) (*[]model.UsageItem, error) {
|
||||
return druidQuery.GetUsage(druid.SqlClient, query)
|
||||
}
|
||||
|
||||
func (druid *DruidReader) GetOperations(ctx context.Context, serviceName string) (*[]string, error) {
|
||||
return druidQuery.GetOperations(druid.SqlClient, serviceName)
|
||||
}
|
||||
|
||||
func (druid *DruidReader) GetTags(ctx context.Context, serviceName string) (*[]model.TagItem, error) {
|
||||
return druidQuery.GetTags(druid.SqlClient, serviceName)
|
||||
}
|
||||
|
||||
func (druid *DruidReader) GetServicesList(ctx context.Context) (*[]string, error) {
|
||||
return druidQuery.GetServicesList(druid.SqlClient)
|
||||
}
|
||||
|
||||
func (druid *DruidReader) SearchTraces(ctx context.Context, traceId string) (*[]model.SearchSpansResult, error) {
|
||||
return druidQuery.SearchTraces(druid.Client, traceId)
|
||||
}
|
||||
|
||||
func (druid *DruidReader) GetServiceMapDependencies(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) {
|
||||
return druidQuery.GetServiceMapDependencies(druid.SqlClient, query)
|
||||
}
|
||||
func (druid *DruidReader) SearchSpansAggregate(ctx context.Context, queryParams *model.SpanSearchAggregatesParams) ([]model.SpanSearchAggregatesResponseItem, error) {
|
||||
return druidQuery.SearchSpansAggregate(druid.Client, queryParams)
|
||||
}
|
@ -1,14 +1,13 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/posthog/posthog-go"
|
||||
"go.signoz.io/query-service/druidQuery"
|
||||
"go.signoz.io/query-service/godruid"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@ -23,17 +22,15 @@ type APIHandler struct {
|
||||
// queryParser queryParser
|
||||
basePath string
|
||||
apiPrefix string
|
||||
client *godruid.Client
|
||||
sqlClient *druidQuery.SqlClient
|
||||
reader *Reader
|
||||
pc *posthog.Client
|
||||
distinctId string
|
||||
}
|
||||
|
||||
// NewAPIHandler returns an APIHandler
|
||||
func NewAPIHandler(client *godruid.Client, sqlClient *druidQuery.SqlClient, pc *posthog.Client, distinctId string) *APIHandler {
|
||||
func NewAPIHandler(reader *Reader, pc *posthog.Client, distinctId string) *APIHandler {
|
||||
aH := &APIHandler{
|
||||
client: client,
|
||||
sqlClient: sqlClient,
|
||||
reader: reader,
|
||||
pc: pc,
|
||||
distinctId: distinctId,
|
||||
}
|
||||
@ -59,7 +56,7 @@ type structuredError struct {
|
||||
func (aH *APIHandler) RegisterRoutes(router *mux.Router) {
|
||||
|
||||
router.HandleFunc("/api/v1/user", aH.user).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v1/get_percentiles", aH.getApplicationPercentiles).Methods(http.MethodGet)
|
||||
// router.HandleFunc("/api/v1/get_percentiles", aH.getApplicationPercentiles).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/services", aH.getServices).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/services/list", aH.getServicesList).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v1/service/overview", aH.getServiceOverview).Methods(http.MethodGet)
|
||||
@ -115,7 +112,7 @@ func (aH *APIHandler) getOperations(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
result, err := druidQuery.GetOperations(aH.sqlClient, serviceName)
|
||||
result, err := (*aH.reader).GetOperations(context.Background(), serviceName)
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
@ -126,7 +123,7 @@ func (aH *APIHandler) getOperations(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
func (aH *APIHandler) getServicesList(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
result, err := druidQuery.GetServicesList(aH.sqlClient)
|
||||
result, err := (*aH.reader).GetServicesList(context.Background())
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
@ -139,7 +136,7 @@ func (aH *APIHandler) searchTags(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
serviceName := r.URL.Query().Get("service")
|
||||
|
||||
result, err := druidQuery.GetTags(aH.sqlClient, serviceName)
|
||||
result, err := (*aH.reader).GetTags(context.Background(), serviceName)
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
@ -155,7 +152,8 @@ func (aH *APIHandler) getTopEndpoints(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
result, err := druidQuery.GetTopEndpoints(aH.sqlClient, query)
|
||||
result, err := (*aH.reader).GetTopEndpoints(context.Background(), query)
|
||||
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
@ -171,7 +169,7 @@ func (aH *APIHandler) getUsage(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
result, err := druidQuery.GetUsage(aH.sqlClient, query)
|
||||
result, err := (*aH.reader).GetUsage(context.Background(), query)
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
@ -187,7 +185,8 @@ func (aH *APIHandler) getServiceDBOverview(w http.ResponseWriter, r *http.Reques
|
||||
return
|
||||
}
|
||||
|
||||
result, err := druidQuery.GetServiceDBOverview(aH.sqlClient, query)
|
||||
result, err := (*aH.reader).GetServiceDBOverview(context.Background(), query)
|
||||
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
@ -203,7 +202,7 @@ func (aH *APIHandler) getServiceExternal(w http.ResponseWriter, r *http.Request)
|
||||
return
|
||||
}
|
||||
|
||||
result, err := druidQuery.GetServiceExternal(aH.sqlClient, query)
|
||||
result, err := (*aH.reader).GetServiceExternal(context.Background(), query)
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
@ -219,7 +218,7 @@ func (aH *APIHandler) GetServiceExternalAvgDuration(w http.ResponseWriter, r *ht
|
||||
return
|
||||
}
|
||||
|
||||
result, err := druidQuery.GetServiceExternalAvgDuration(aH.sqlClient, query)
|
||||
result, err := (*aH.reader).GetServiceExternalAvgDuration(context.Background(), query)
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
@ -235,7 +234,7 @@ func (aH *APIHandler) getServiceExternalErrors(w http.ResponseWriter, r *http.Re
|
||||
return
|
||||
}
|
||||
|
||||
result, err := druidQuery.GetServiceExternalErrors(aH.sqlClient, query)
|
||||
result, err := (*aH.reader).GetServiceExternalErrors(context.Background(), query)
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
@ -251,7 +250,7 @@ func (aH *APIHandler) getServiceOverview(w http.ResponseWriter, r *http.Request)
|
||||
return
|
||||
}
|
||||
|
||||
result, err := druidQuery.GetServiceOverview(aH.sqlClient, query)
|
||||
result, err := (*aH.reader).GetServiceOverview(context.Background(), query)
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
@ -267,7 +266,7 @@ func (aH *APIHandler) getServices(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
result, err := druidQuery.GetServices(aH.sqlClient, query)
|
||||
result, err := (*aH.reader).GetServices(context.Background(), query)
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
@ -289,7 +288,7 @@ func (aH *APIHandler) serviceMapDependencies(w http.ResponseWriter, r *http.Requ
|
||||
return
|
||||
}
|
||||
|
||||
result, err := druidQuery.GetServiceMapDependencies(aH.sqlClient, query)
|
||||
result, err := (*aH.reader).GetServiceMapDependencies(context.Background(), query)
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
@ -302,7 +301,7 @@ func (aH *APIHandler) searchTraces(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
traceId := vars["traceId"]
|
||||
|
||||
result, err := druidQuery.SearchTraces(aH.client, traceId)
|
||||
result, err := (*aH.reader).SearchTraces(context.Background(), traceId)
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
@ -310,6 +309,7 @@ func (aH *APIHandler) searchTraces(w http.ResponseWriter, r *http.Request) {
|
||||
aH.writeJSON(w, r, result)
|
||||
|
||||
}
|
||||
|
||||
func (aH *APIHandler) searchSpansAggregates(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
query, err := parseSearchSpanAggregatesRequest(r)
|
||||
@ -317,7 +317,7 @@ func (aH *APIHandler) searchSpansAggregates(w http.ResponseWriter, r *http.Reque
|
||||
return
|
||||
}
|
||||
|
||||
result, err := druidQuery.SearchSpansAggregate(aH.client, query)
|
||||
result, err := (*aH.reader).SearchSpansAggregate(context.Background(), query)
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
@ -332,7 +332,9 @@ func (aH *APIHandler) searchSpans(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
result, err := druidQuery.SearchSpans(aH.client, query)
|
||||
// result, err := druidQuery.SearchSpans(aH.client, query)
|
||||
result, err := (*aH.reader).SearchSpans(context.Background(), query)
|
||||
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
@ -340,20 +342,20 @@ func (aH *APIHandler) searchSpans(w http.ResponseWriter, r *http.Request) {
|
||||
aH.writeJSON(w, r, result)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) getApplicationPercentiles(w http.ResponseWriter, r *http.Request) {
|
||||
// vars := mux.Vars(r)
|
||||
// func (aH *APIHandler) getApplicationPercentiles(w http.ResponseWriter, r *http.Request) {
|
||||
// // vars := mux.Vars(r)
|
||||
|
||||
query, err := parseApplicationPercentileRequest(r)
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
// query, err := parseApplicationPercentileRequest(r)
|
||||
// if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
// return
|
||||
// }
|
||||
|
||||
result, err := druidQuery.GetApplicationPercentiles(aH.client, query)
|
||||
if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
return
|
||||
}
|
||||
aH.writeJSON(w, r, result)
|
||||
}
|
||||
// result, err := (*aH.reader).GetApplicationPercentiles(context.Background(), query)
|
||||
// if aH.handleError(w, err, http.StatusBadRequest) {
|
||||
// return
|
||||
// }
|
||||
// aH.writeJSON(w, r, result)
|
||||
// }
|
||||
|
||||
func (aH *APIHandler) handleError(w http.ResponseWriter, err error, statusCode int) bool {
|
||||
if err == nil {
|
||||
|
26
pkg/query-service/app/interface.go
Normal file
26
pkg/query-service/app/interface.go
Normal file
@ -0,0 +1,26 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.signoz.io/query-service/model"
|
||||
)
|
||||
|
||||
type Reader interface {
|
||||
GetServiceOverview(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceOverviewItem, error)
|
||||
GetServices(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceItem, error)
|
||||
// GetApplicationPercentiles(ctx context.Context, query *model.ApplicationPercentileParams) ([]godruid.Timeseries, error)
|
||||
SearchSpans(ctx context.Context, query *model.SpanSearchParams) (*[]model.SearchSpansResult, error)
|
||||
GetServiceDBOverview(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceDBOverviewItem, error)
|
||||
GetServiceExternalAvgDuration(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error)
|
||||
GetServiceExternalErrors(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error)
|
||||
GetServiceExternal(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error)
|
||||
GetTopEndpoints(ctx context.Context, query *model.GetTopEndpointsParams) (*[]model.TopEndpointsItem, error)
|
||||
GetUsage(ctx context.Context, query *model.GetUsageParams) (*[]model.UsageItem, error)
|
||||
GetOperations(ctx context.Context, serviceName string) (*[]string, error)
|
||||
GetTags(ctx context.Context, serviceName string) (*[]model.TagItem, error)
|
||||
GetServicesList(ctx context.Context) (*[]string, error)
|
||||
SearchTraces(ctx context.Context, traceID string) (*[]model.SearchSpansResult, error)
|
||||
GetServiceMapDependencies(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error)
|
||||
SearchSpansAggregate(ctx context.Context, queryParams *model.SpanSearchAggregatesParams) ([]model.SpanSearchAggregatesResponseItem, error)
|
||||
}
|
@ -38,6 +38,8 @@ func parseGetTopEndpointsRequest(r *http.Request) (*model.GetTopEndpointsParams,
|
||||
StartTime: startTime.Format(time.RFC3339Nano),
|
||||
EndTime: endTime.Format(time.RFC3339Nano),
|
||||
ServiceName: serviceName,
|
||||
Start: startTime,
|
||||
End: endTime,
|
||||
}
|
||||
|
||||
return &getTopEndpointsParams, nil
|
||||
@ -64,12 +66,16 @@ func parseGetUsageRequest(r *http.Request) (*model.GetUsageParams, error) {
|
||||
}
|
||||
|
||||
serviceName := r.URL.Query().Get("service")
|
||||
stepHour := stepInt / 3600
|
||||
|
||||
getUsageParams := model.GetUsageParams{
|
||||
StartTime: startTime.Format(time.RFC3339Nano),
|
||||
EndTime: endTime.Format(time.RFC3339Nano),
|
||||
Start: startTime,
|
||||
End: endTime,
|
||||
ServiceName: serviceName,
|
||||
Period: fmt.Sprintf("PT%dH", stepInt/3600),
|
||||
Period: fmt.Sprintf("PT%dH", stepHour),
|
||||
StepHour: stepHour,
|
||||
}
|
||||
|
||||
return &getUsageParams, nil
|
||||
@ -101,7 +107,9 @@ func parseGetServiceExternalRequest(r *http.Request) (*model.GetServiceOverviewP
|
||||
}
|
||||
|
||||
getServiceOverviewParams := model.GetServiceOverviewParams{
|
||||
Start: startTime,
|
||||
StartTime: startTime.Format(time.RFC3339Nano),
|
||||
End: endTime,
|
||||
EndTime: endTime.Format(time.RFC3339Nano),
|
||||
ServiceName: serviceName,
|
||||
Period: fmt.Sprintf("PT%dM", stepInt/60),
|
||||
@ -137,7 +145,9 @@ func parseGetServiceOverviewRequest(r *http.Request) (*model.GetServiceOverviewP
|
||||
}
|
||||
|
||||
getServiceOverviewParams := model.GetServiceOverviewParams{
|
||||
Start: startTime,
|
||||
StartTime: startTime.Format(time.RFC3339Nano),
|
||||
End: endTime,
|
||||
EndTime: endTime.Format(time.RFC3339Nano),
|
||||
ServiceName: serviceName,
|
||||
Period: fmt.Sprintf("PT%dM", stepInt/60),
|
||||
@ -160,7 +170,9 @@ func parseGetServicesRequest(r *http.Request) (*model.GetServicesParams, error)
|
||||
}
|
||||
|
||||
getServicesParams := model.GetServicesParams{
|
||||
Start: startTime,
|
||||
StartTime: startTime.Format(time.RFC3339Nano),
|
||||
End: endTime,
|
||||
EndTime: endTime.Format(time.RFC3339Nano),
|
||||
Period: int(endTime.Unix() - startTime.Unix()),
|
||||
}
|
||||
@ -283,6 +295,8 @@ func parseSpanSearchRequest(r *http.Request) (*model.SpanSearchParams, error) {
|
||||
// fmt.Println(startTimeStr)
|
||||
params := &model.SpanSearchParams{
|
||||
Intervals: fmt.Sprintf("%s/%s", startTimeStr, endTimeStr),
|
||||
Start: startTime,
|
||||
End: endTime,
|
||||
Limit: 100,
|
||||
Order: "descending",
|
||||
}
|
||||
|
@ -1,8 +1,10 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
@ -11,16 +13,16 @@ import (
|
||||
"github.com/posthog/posthog-go"
|
||||
"github.com/rs/cors"
|
||||
"github.com/soheilhy/cmux"
|
||||
"go.signoz.io/query-service/druidQuery"
|
||||
"go.signoz.io/query-service/godruid"
|
||||
"go.signoz.io/query-service/app/clickhouseReader"
|
||||
"go.signoz.io/query-service/app/druidReader"
|
||||
"go.signoz.io/query-service/healthcheck"
|
||||
"go.signoz.io/query-service/utils"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type ServerOptions struct {
|
||||
HTTPHostPort string
|
||||
DruidClientUrl string
|
||||
HTTPHostPort string
|
||||
// DruidClientUrl string
|
||||
}
|
||||
|
||||
// Server runs HTTP, Mux and a grpc server
|
||||
@ -28,11 +30,10 @@ type Server struct {
|
||||
// logger *zap.Logger
|
||||
// querySvc *querysvc.QueryService
|
||||
// queryOptions *QueryOptions
|
||||
serverOptions *ServerOptions
|
||||
|
||||
// tracer opentracing.Tracer // TODO make part of flags.Service
|
||||
|
||||
conn net.Listener
|
||||
serverOptions *ServerOptions
|
||||
conn net.Listener
|
||||
// grpcConn net.Listener
|
||||
httpConn net.Listener
|
||||
// grpcServer *grpc.Server
|
||||
@ -64,6 +65,11 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
httpServer, err := createHTTPServer()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Server{
|
||||
// logger: logger,
|
||||
@ -72,7 +78,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
// tracer: tracer,
|
||||
// grpcServer: grpcServer,
|
||||
serverOptions: serverOptions,
|
||||
httpServer: createHTTPServer(serverOptions.DruidClientUrl),
|
||||
httpServer: httpServer,
|
||||
separatePorts: true,
|
||||
// separatePorts: grpcPort != httpPort,
|
||||
unavailableChannel: make(chan healthcheck.Status),
|
||||
@ -82,22 +88,25 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
var posthogClient posthog.Client
|
||||
var distinctId string
|
||||
|
||||
func createHTTPServer(druidClientUrl string) *http.Server {
|
||||
func createHTTPServer() (*http.Server, error) {
|
||||
|
||||
posthogClient = posthog.New("H-htDCae7CR3RV57gUzmol6IAKtm5IMCvbcm_fwnL-w")
|
||||
distinctId = uuid.New().String()
|
||||
|
||||
client := godruid.Client{
|
||||
Url: druidClientUrl,
|
||||
Debug: true,
|
||||
var reader Reader
|
||||
|
||||
storage := os.Getenv("STORAGE")
|
||||
if storage == "druid" {
|
||||
zap.S().Info("Using Apache Druid as datastore ...")
|
||||
reader = druidReader.NewReader()
|
||||
} else if storage == "clickhouse" {
|
||||
zap.S().Info("Using ClickHouse as datastore ...")
|
||||
reader = clickhouseReader.NewReader()
|
||||
} else {
|
||||
return nil, fmt.Errorf("Storage type: %s is not supported in query service", storage)
|
||||
}
|
||||
|
||||
sqlClient := druidQuery.SqlClient{
|
||||
Url: druidClientUrl,
|
||||
Debug: true,
|
||||
}
|
||||
|
||||
apiHandler := NewAPIHandler(&client, &sqlClient, &posthogClient, distinctId)
|
||||
apiHandler := NewAPIHandler(&reader, &posthogClient, distinctId)
|
||||
r := NewRouter()
|
||||
|
||||
r.Use(analyticsMiddleware)
|
||||
@ -118,7 +127,7 @@ func createHTTPServer(druidClientUrl string) *http.Server {
|
||||
|
||||
return &http.Server{
|
||||
Handler: handler,
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
func loggingMiddleware(next http.Handler) http.Handler {
|
||||
|
@ -11,92 +11,6 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type ServiceItem struct {
|
||||
ServiceName string `json:"serviceName"`
|
||||
Percentile99 float32 `json:"p99"`
|
||||
AvgDuration float32 `json:"avgDuration"`
|
||||
NumCalls int `json:"numCalls"`
|
||||
CallRate float32 `json:"callRate"`
|
||||
NumErrors int `json:"numErrors"`
|
||||
ErrorRate float32 `json:"errorRate"`
|
||||
Num4XX int `json:"num4XX"`
|
||||
FourXXRate float32 `json:"fourXXRate"`
|
||||
}
|
||||
type ServiceListErrorItem struct {
|
||||
ServiceName string `json:"serviceName"`
|
||||
NumErrors int `json:"numErrors"`
|
||||
Num4xx int `json:"num4xx"`
|
||||
}
|
||||
|
||||
type ServiceErrorItem struct {
|
||||
Time string `json:"time,omitempty"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
NumErrors int `json:"numErrors"`
|
||||
}
|
||||
|
||||
type ServiceOverviewItem struct {
|
||||
Time string `json:"time,omitempty"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Percentile50 float32 `json:"p50"`
|
||||
Percentile95 float32 `json:"p95"`
|
||||
Percentile99 float32 `json:"p99"`
|
||||
NumCalls int `json:"numCalls"`
|
||||
CallRate float32 `json:"callRate"`
|
||||
NumErrors int `json:"numErrors"`
|
||||
ErrorRate float32 `json:"errorRate"`
|
||||
}
|
||||
|
||||
type ServiceExternalItem struct {
|
||||
Time string `json:"time,omitempty"`
|
||||
Timestamp int64 `json:"timestamp,omitempty"`
|
||||
ExternalHttpUrl string `json:"externalHttpUrl,omitempty"`
|
||||
AvgDuration float32 `json:"avgDuration,omitempty"`
|
||||
NumCalls int `json:"numCalls,omitempty"`
|
||||
CallRate float32 `json:"callRate,omitempty"`
|
||||
NumErrors int `json:"numErrors"`
|
||||
ErrorRate float32 `json:"errorRate"`
|
||||
}
|
||||
|
||||
type ServiceDBOverviewItem struct {
|
||||
Time string `json:"time,omitempty"`
|
||||
Timestamp int64 `json:"timestamp,omitempty"`
|
||||
DBSystem string `json:"dbSystem,omitempty"`
|
||||
AvgDuration float32 `json:"avgDuration,omitempty"`
|
||||
NumCalls int `json:"numCalls,omitempty"`
|
||||
CallRate float32 `json:"callRate,omitempty"`
|
||||
}
|
||||
|
||||
type ServiceMapDependencyItem struct {
|
||||
SpanId string `json:"spanId,omitempty"`
|
||||
ParentSpanId string `json:"parentSpanId,omitempty"`
|
||||
ServiceName string `json:"serviceName,omitempty"`
|
||||
}
|
||||
|
||||
type UsageItem struct {
|
||||
Time string `json:"time,omitempty"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Count int64 `json:"count"`
|
||||
}
|
||||
|
||||
type TopEnpointsItem struct {
|
||||
Percentile50 float32 `json:"p50"`
|
||||
Percentile90 float32 `json:"p90"`
|
||||
Percentile99 float32 `json:"p99"`
|
||||
NumCalls int `json:"numCalls"`
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
type TagItem struct {
|
||||
TagKeys string `json:"tagKeys"`
|
||||
TagCount int `json:"tagCount"`
|
||||
}
|
||||
|
||||
type ServiceMapDependencyResponseItem struct {
|
||||
Parent string `json:"parent,omitempty"`
|
||||
Child string `json:"child,omitempty"`
|
||||
CallCount int `json:"callCount,omitempty"`
|
||||
}
|
||||
|
||||
func GetOperations(client *SqlClient, serviceName string) (*[]string, error) {
|
||||
|
||||
sqlQuery := fmt.Sprintf(`SELECT DISTINCT(Name) FROM %s WHERE ServiceName='%s' AND __time > CURRENT_TIMESTAMP - INTERVAL '1' DAY`, constants.DruidDatasource, serviceName)
|
||||
@ -155,7 +69,7 @@ func GetServicesList(client *SqlClient) (*[]string, error) {
|
||||
return &servicesListReponse, nil
|
||||
}
|
||||
|
||||
func GetTags(client *SqlClient, serviceName string) (*[]TagItem, error) {
|
||||
func GetTags(client *SqlClient, serviceName string) (*[]model.TagItem, error) {
|
||||
|
||||
var sqlQuery string
|
||||
|
||||
@ -176,7 +90,7 @@ func GetTags(client *SqlClient, serviceName string) (*[]TagItem, error) {
|
||||
|
||||
// zap.S().Info(string(response))
|
||||
|
||||
res := new([]TagItem)
|
||||
res := new([]model.TagItem)
|
||||
err = json.Unmarshal(response, res)
|
||||
if err != nil {
|
||||
zap.S().Error(err)
|
||||
@ -187,9 +101,9 @@ func GetTags(client *SqlClient, serviceName string) (*[]TagItem, error) {
|
||||
return &tagResponse, nil
|
||||
}
|
||||
|
||||
func GetTopEndpoints(client *SqlClient, query *model.GetTopEndpointsParams) (*[]TopEnpointsItem, error) {
|
||||
func GetTopEndpoints(client *SqlClient, query *model.GetTopEndpointsParams) (*[]model.TopEndpointsItem, error) {
|
||||
|
||||
sqlQuery := fmt.Sprintf(`SELECT APPROX_QUANTILE_DS("QuantileDuration", 0.5) as p50, APPROX_QUANTILE_DS("QuantileDuration", 0.9) as p90, APPROX_QUANTILE_DS("QuantileDuration", 0.99) as p99, COUNT(SpanId) as numCalls, Name FROM "%s" WHERE "__time" >= '%s' AND "__time" <= '%s' AND "Kind"='2' and "ServiceName"='%s' GROUP BY Name`, constants.DruidDatasource, query.StartTime, query.EndTime, query.ServiceName)
|
||||
sqlQuery := fmt.Sprintf(`SELECT APPROX_QUANTILE_DS("QuantileDuration", 0.5) as p50, APPROX_QUANTILE_DS("QuantileDuration", 0.95) as p95, APPROX_QUANTILE_DS("QuantileDuration", 0.99) as p99, COUNT(SpanId) as numCalls, Name FROM "%s" WHERE "__time" >= '%s' AND "__time" <= '%s' AND "Kind"='2' and "ServiceName"='%s' GROUP BY Name`, constants.DruidDatasource, query.StartTime, query.EndTime, query.ServiceName)
|
||||
|
||||
// zap.S().Debug(sqlQuery)
|
||||
|
||||
@ -202,7 +116,7 @@ func GetTopEndpoints(client *SqlClient, query *model.GetTopEndpointsParams) (*[]
|
||||
|
||||
// zap.S().Info(string(response))
|
||||
|
||||
res := new([]TopEnpointsItem)
|
||||
res := new([]model.TopEndpointsItem)
|
||||
err = json.Unmarshal(response, res)
|
||||
if err != nil {
|
||||
zap.S().Error(err)
|
||||
@ -213,7 +127,7 @@ func GetTopEndpoints(client *SqlClient, query *model.GetTopEndpointsParams) (*[]
|
||||
return &topEnpointsResponse, nil
|
||||
}
|
||||
|
||||
func GetUsage(client *SqlClient, query *model.GetUsageParams) (*[]UsageItem, error) {
|
||||
func GetUsage(client *SqlClient, query *model.GetUsageParams) (*[]model.UsageItem, error) {
|
||||
|
||||
var sqlQuery string
|
||||
|
||||
@ -236,7 +150,7 @@ func GetUsage(client *SqlClient, query *model.GetUsageParams) (*[]UsageItem, err
|
||||
|
||||
// zap.S().Info(string(response))
|
||||
|
||||
res := new([]UsageItem)
|
||||
res := new([]model.UsageItem)
|
||||
err = json.Unmarshal(response, res)
|
||||
if err != nil {
|
||||
zap.S().Error(err)
|
||||
@ -253,7 +167,7 @@ func GetUsage(client *SqlClient, query *model.GetUsageParams) (*[]UsageItem, err
|
||||
return &usageResponse, nil
|
||||
}
|
||||
|
||||
func GetServiceExternalAvgDuration(client *SqlClient, query *model.GetServiceOverviewParams) (*[]ServiceExternalItem, error) {
|
||||
func GetServiceExternalAvgDuration(client *SqlClient, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) {
|
||||
|
||||
sqlQuery := fmt.Sprintf(`SELECT TIME_FLOOR(__time, '%s') as "time", AVG(DurationNano) as "avgDuration" FROM %s WHERE ServiceName='%s' AND Kind='3' AND ExternalHttpUrl != '' AND "__time" >= '%s' AND "__time" <= '%s'
|
||||
GROUP BY TIME_FLOOR(__time, '%s')`, query.Period, constants.DruidDatasource, query.ServiceName, query.StartTime, query.EndTime, query.Period)
|
||||
@ -270,7 +184,7 @@ func GetServiceExternalAvgDuration(client *SqlClient, query *model.GetServiceOve
|
||||
// responseStr := string(response)
|
||||
// zap.S().Info(responseStr)
|
||||
|
||||
res := new([]ServiceExternalItem)
|
||||
res := new([]model.ServiceExternalItem)
|
||||
err = json.Unmarshal(response, res)
|
||||
if err != nil {
|
||||
zap.S().Error(err)
|
||||
@ -289,7 +203,7 @@ func GetServiceExternalAvgDuration(client *SqlClient, query *model.GetServiceOve
|
||||
return &servicesExternalResponse, nil
|
||||
}
|
||||
|
||||
func GetServiceExternalErrors(client *SqlClient, query *model.GetServiceOverviewParams) (*[]ServiceExternalItem, error) {
|
||||
func GetServiceExternalErrors(client *SqlClient, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) {
|
||||
|
||||
sqlQuery := fmt.Sprintf(`SELECT TIME_FLOOR(__time, '%s') as "time", COUNT(SpanId) as "numCalls", ExternalHttpUrl as externalHttpUrl FROM %s WHERE ServiceName='%s' AND Kind='3' AND ExternalHttpUrl != '' AND StatusCode >= 500 AND "__time" >= '%s' AND "__time" <= '%s'
|
||||
GROUP BY TIME_FLOOR(__time, '%s'), ExternalHttpUrl`, query.Period, constants.DruidDatasource, query.ServiceName, query.StartTime, query.EndTime, query.Period)
|
||||
@ -306,7 +220,7 @@ func GetServiceExternalErrors(client *SqlClient, query *model.GetServiceOverview
|
||||
// responseStr := string(response)
|
||||
// zap.S().Info(responseStr)
|
||||
|
||||
res := new([]ServiceExternalItem)
|
||||
res := new([]model.ServiceExternalItem)
|
||||
err = json.Unmarshal(response, res)
|
||||
if err != nil {
|
||||
zap.S().Error(err)
|
||||
@ -328,7 +242,7 @@ func GetServiceExternalErrors(client *SqlClient, query *model.GetServiceOverview
|
||||
// responseStr := string(response)
|
||||
// zap.S().Info(responseStr)
|
||||
|
||||
resTotal := new([]ServiceExternalItem)
|
||||
resTotal := new([]model.ServiceExternalItem)
|
||||
err = json.Unmarshal(responseTotal, resTotal)
|
||||
if err != nil {
|
||||
zap.S().Error(err)
|
||||
@ -361,7 +275,7 @@ func GetServiceExternalErrors(client *SqlClient, query *model.GetServiceOverview
|
||||
return &servicesExternalResponse, nil
|
||||
}
|
||||
|
||||
func GetServiceExternal(client *SqlClient, query *model.GetServiceOverviewParams) (*[]ServiceExternalItem, error) {
|
||||
func GetServiceExternal(client *SqlClient, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) {
|
||||
|
||||
sqlQuery := fmt.Sprintf(`SELECT TIME_FLOOR(__time, '%s') as "time", AVG(DurationNano) as "avgDuration", COUNT(SpanId) as "numCalls", ExternalHttpUrl as externalHttpUrl FROM %s WHERE ServiceName='%s' AND Kind='3' AND ExternalHttpUrl != ''
|
||||
AND "__time" >= '%s' AND "__time" <= '%s'
|
||||
@ -379,7 +293,7 @@ func GetServiceExternal(client *SqlClient, query *model.GetServiceOverviewParams
|
||||
// responseStr := string(response)
|
||||
// zap.S().Info(responseStr)
|
||||
|
||||
res := new([]ServiceExternalItem)
|
||||
res := new([]model.ServiceExternalItem)
|
||||
err = json.Unmarshal(response, res)
|
||||
if err != nil {
|
||||
zap.S().Error(err)
|
||||
@ -398,7 +312,7 @@ func GetServiceExternal(client *SqlClient, query *model.GetServiceOverviewParams
|
||||
return &servicesExternalResponse, nil
|
||||
}
|
||||
|
||||
func GetServiceDBOverview(client *SqlClient, query *model.GetServiceOverviewParams) (*[]ServiceDBOverviewItem, error) {
|
||||
func GetServiceDBOverview(client *SqlClient, query *model.GetServiceOverviewParams) (*[]model.ServiceDBOverviewItem, error) {
|
||||
|
||||
sqlQuery := fmt.Sprintf(`SELECT TIME_FLOOR(__time, '%s') as "time", AVG(DurationNano) as "avgDuration", COUNT(SpanId) as "numCalls", DBSystem as "dbSystem" FROM %s WHERE ServiceName='%s' AND Kind='3' AND DBName IS NOT NULL
|
||||
AND "__time" >= '%s' AND "__time" <= '%s'
|
||||
@ -416,7 +330,7 @@ func GetServiceDBOverview(client *SqlClient, query *model.GetServiceOverviewPara
|
||||
// responseStr := string(response)
|
||||
// zap.S().Info(responseStr)
|
||||
|
||||
res := new([]ServiceDBOverviewItem)
|
||||
res := new([]model.ServiceDBOverviewItem)
|
||||
err = json.Unmarshal(response, res)
|
||||
if err != nil {
|
||||
zap.S().Error(err)
|
||||
@ -435,7 +349,7 @@ func GetServiceDBOverview(client *SqlClient, query *model.GetServiceOverviewPara
|
||||
return &servicesDBOverviewResponse, nil
|
||||
}
|
||||
|
||||
func GetServiceOverview(client *SqlClient, query *model.GetServiceOverviewParams) (*[]ServiceOverviewItem, error) {
|
||||
func GetServiceOverview(client *SqlClient, query *model.GetServiceOverviewParams) (*[]model.ServiceOverviewItem, error) {
|
||||
|
||||
sqlQuery := fmt.Sprintf(`SELECT TIME_FLOOR(__time, '%s') as "time", APPROX_QUANTILE_DS("QuantileDuration", 0.5) as p50, APPROX_QUANTILE_DS("QuantileDuration", 0.95) as p95,
|
||||
APPROX_QUANTILE_DS("QuantileDuration", 0.99) as p99, COUNT("SpanId") as "numCalls" FROM "%s" WHERE "__time" >= '%s' and "__time" <= '%s' and "Kind"='2' and "ServiceName"='%s' GROUP BY TIME_FLOOR(__time, '%s') `, query.Period, constants.DruidDatasource, query.StartTime, query.EndTime, query.ServiceName, query.Period)
|
||||
@ -451,7 +365,7 @@ func GetServiceOverview(client *SqlClient, query *model.GetServiceOverviewParams
|
||||
|
||||
// zap.S().Info(string(response))
|
||||
|
||||
res := new([]ServiceOverviewItem)
|
||||
res := new([]model.ServiceOverviewItem)
|
||||
err = json.Unmarshal(response, res)
|
||||
if err != nil {
|
||||
zap.S().Error(err)
|
||||
@ -471,7 +385,7 @@ func GetServiceOverview(client *SqlClient, query *model.GetServiceOverviewParams
|
||||
|
||||
// zap.S().Info(string(response))
|
||||
|
||||
resError := new([]ServiceErrorItem)
|
||||
resError := new([]model.ServiceErrorItem)
|
||||
err = json.Unmarshal(responseError, resError)
|
||||
if err != nil {
|
||||
zap.S().Error(err)
|
||||
@ -501,7 +415,7 @@ func GetServiceOverview(client *SqlClient, query *model.GetServiceOverviewParams
|
||||
return &servicesOverviewResponse, nil
|
||||
}
|
||||
|
||||
func GetServices(client *SqlClient, query *model.GetServicesParams) (*[]ServiceItem, error) {
|
||||
func GetServices(client *SqlClient, query *model.GetServicesParams) (*[]model.ServiceItem, error) {
|
||||
|
||||
sqlQuery := fmt.Sprintf(`SELECT APPROX_QUANTILE_DS("QuantileDuration", 0.99) as "p99", AVG("DurationNano") as "avgDuration", COUNT(SpanId) as numCalls, "ServiceName" as "serviceName" FROM %s WHERE "__time" >= '%s' and "__time" <= '%s' and "Kind"='2' GROUP BY "ServiceName" ORDER BY "p99" DESC`, constants.DruidDatasource, query.StartTime, query.EndTime)
|
||||
|
||||
@ -516,7 +430,7 @@ func GetServices(client *SqlClient, query *model.GetServicesParams) (*[]ServiceI
|
||||
|
||||
// zap.S().Info(string(response))
|
||||
|
||||
res := new([]ServiceItem)
|
||||
res := new([]model.ServiceItem)
|
||||
err = json.Unmarshal(response, res)
|
||||
if err != nil {
|
||||
zap.S().Error(err)
|
||||
@ -538,7 +452,7 @@ func GetServices(client *SqlClient, query *model.GetServicesParams) (*[]ServiceI
|
||||
|
||||
// zap.S().Info(string(response))
|
||||
|
||||
resError := new([]ServiceListErrorItem)
|
||||
resError := new([]model.ServiceListErrorItem)
|
||||
err = json.Unmarshal(responseError, resError)
|
||||
if err != nil {
|
||||
zap.S().Error(err)
|
||||
@ -555,7 +469,7 @@ func GetServices(client *SqlClient, query *model.GetServicesParams) (*[]ServiceI
|
||||
|
||||
////////////////// Below block gets 4xx of services
|
||||
|
||||
sqlQuery = fmt.Sprintf(`SELECT COUNT(SpanId) as numErrors, "ServiceName" as "serviceName" FROM %s WHERE "__time" >= '%s' and "__time" <= '%s' and "Kind"='2' and "StatusCode">=400 and "StatusCode" < 500 GROUP BY "ServiceName"`, constants.DruidDatasource, query.StartTime, query.EndTime)
|
||||
sqlQuery = fmt.Sprintf(`SELECT COUNT(SpanId) as num4xx, "ServiceName" as "serviceName" FROM %s WHERE "__time" >= '%s' and "__time" <= '%s' and "Kind"='2' and "StatusCode">=400 and "StatusCode" < 500 GROUP BY "ServiceName"`, constants.DruidDatasource, query.StartTime, query.EndTime)
|
||||
|
||||
response4xx, err := client.Query(sqlQuery, "object")
|
||||
|
||||
@ -568,7 +482,7 @@ func GetServices(client *SqlClient, query *model.GetServicesParams) (*[]ServiceI
|
||||
|
||||
// zap.S().Info(string(response))
|
||||
|
||||
res4xx := new([]ServiceListErrorItem)
|
||||
res4xx := new([]model.ServiceListErrorItem)
|
||||
err = json.Unmarshal(response4xx, res4xx)
|
||||
if err != nil {
|
||||
zap.S().Error(err)
|
||||
@ -601,9 +515,9 @@ func GetServices(client *SqlClient, query *model.GetServicesParams) (*[]ServiceI
|
||||
return &servicesResponse, nil
|
||||
}
|
||||
|
||||
func GetServiceMapDependencies(client *SqlClient, query *model.GetServicesParams) (*[]ServiceMapDependencyResponseItem, error) {
|
||||
func GetServiceMapDependencies(client *SqlClient, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) {
|
||||
|
||||
sqlQuery := fmt.Sprintf(`SELECT SpanId, ParentSpanId, ServiceName FROM %s WHERE "__time" >= '%s' AND "__time" <= '%s' ORDER BY __time DESC LIMIT 100000`, constants.DruidDatasource, query.StartTime, query.EndTime)
|
||||
sqlQuery := fmt.Sprintf(`SELECT SpanId, ParentSpanId, ServiceName FROM %s WHERE "__time" >= '%s' AND "__time" <= '%s' ORDER BY __time DESC`, constants.DruidDatasource, query.StartTime, query.EndTime)
|
||||
|
||||
// zap.S().Debug(sqlQuery)
|
||||
|
||||
@ -617,7 +531,7 @@ func GetServiceMapDependencies(client *SqlClient, query *model.GetServicesParams
|
||||
// responseStr := string(response)
|
||||
// zap.S().Info(responseStr)
|
||||
|
||||
res := new([]ServiceMapDependencyItem)
|
||||
res := new([]model.ServiceMapDependencyItem)
|
||||
err = json.Unmarshal(response, res)
|
||||
if err != nil {
|
||||
zap.S().Error(err)
|
||||
@ -626,7 +540,7 @@ func GetServiceMapDependencies(client *SqlClient, query *model.GetServicesParams
|
||||
// resCount := len(*res)
|
||||
// fmt.Println(resCount)
|
||||
|
||||
serviceMap := make(map[string]*ServiceMapDependencyResponseItem)
|
||||
serviceMap := make(map[string]*model.ServiceMapDependencyResponseItem)
|
||||
|
||||
spanId2ServiceNameMap := make(map[string]string)
|
||||
for i, _ := range *res {
|
||||
@ -635,7 +549,7 @@ func GetServiceMapDependencies(client *SqlClient, query *model.GetServicesParams
|
||||
for i, _ := range *res {
|
||||
parent2childServiceName := spanId2ServiceNameMap[(*res)[i].ParentSpanId] + "-" + spanId2ServiceNameMap[(*res)[i].SpanId]
|
||||
if _, ok := serviceMap[parent2childServiceName]; !ok {
|
||||
serviceMap[parent2childServiceName] = &ServiceMapDependencyResponseItem{
|
||||
serviceMap[parent2childServiceName] = &model.ServiceMapDependencyResponseItem{
|
||||
Parent: spanId2ServiceNameMap[(*res)[i].ParentSpanId],
|
||||
Child: spanId2ServiceNameMap[(*res)[i].SpanId],
|
||||
CallCount: 1,
|
||||
@ -645,7 +559,7 @@ func GetServiceMapDependencies(client *SqlClient, query *model.GetServicesParams
|
||||
}
|
||||
}
|
||||
|
||||
retMe := make([]ServiceMapDependencyResponseItem, 0, len(serviceMap))
|
||||
retMe := make([]model.ServiceMapDependencyResponseItem, 0, len(serviceMap))
|
||||
for _, dependency := range serviceMap {
|
||||
if dependency.Parent == "" {
|
||||
continue
|
||||
|
@ -27,11 +27,6 @@ type SpanSearchAggregatesDuratonReceivedItem struct {
|
||||
Result DurationItem `json:"result"`
|
||||
}
|
||||
|
||||
type SpanSearchAggregatesResponseItem struct {
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Value float32 `json:"value"`
|
||||
}
|
||||
|
||||
func buildFilters(queryParams *model.SpanSearchParams) (*godruid.Filter, error) {
|
||||
|
||||
var filter *godruid.Filter
|
||||
@ -181,7 +176,7 @@ func buildFiltersForSpansAggregates(queryParams *model.SpanSearchAggregatesParam
|
||||
|
||||
}
|
||||
|
||||
func SearchTraces(client *godruid.Client, traceId string) ([]godruid.ScanResult, error) {
|
||||
func SearchTraces(client *godruid.Client, traceId string) (*[]model.SearchSpansResult, error) {
|
||||
|
||||
filter := godruid.FilterSelector("TraceId", traceId)
|
||||
|
||||
@ -206,10 +201,20 @@ func SearchTraces(client *godruid.Client, traceId string) ([]godruid.ScanResult,
|
||||
|
||||
// fmt.Printf("query.QueryResult:\n%v", query.QueryResult)
|
||||
|
||||
return query.QueryResult, nil
|
||||
var searchSpansResult []model.SearchSpansResult
|
||||
searchSpansResult = make([]model.SearchSpansResult, len(query.QueryResult))
|
||||
|
||||
searchSpansResult[0].Columns = make([]string, len(query.QueryResult[0].Columns))
|
||||
copy(searchSpansResult[0].Columns, query.QueryResult[0].Columns)
|
||||
|
||||
searchSpansResult[0].Events = make([][]interface{}, len(query.QueryResult[0].Events))
|
||||
copy(searchSpansResult[0].Events, query.QueryResult[0].Events)
|
||||
|
||||
return &searchSpansResult, nil
|
||||
|
||||
}
|
||||
|
||||
func SearchSpansAggregate(client *godruid.Client, queryParams *model.SpanSearchAggregatesParams) ([]SpanSearchAggregatesResponseItem, error) {
|
||||
func SearchSpansAggregate(client *godruid.Client, queryParams *model.SpanSearchAggregatesParams) ([]model.SpanSearchAggregatesResponseItem, error) {
|
||||
|
||||
filter, err := buildFiltersForSpansAggregates(queryParams)
|
||||
var needsPostAggregation bool = true
|
||||
@ -293,7 +298,7 @@ func SearchSpansAggregate(client *godruid.Client, queryParams *model.SpanSearchA
|
||||
return nil, fmt.Errorf("Error in unmarshalling response from druid")
|
||||
}
|
||||
|
||||
var response []SpanSearchAggregatesResponseItem
|
||||
var response []model.SpanSearchAggregatesResponseItem
|
||||
|
||||
for _, elem := range *receivedResponse {
|
||||
|
||||
@ -304,7 +309,7 @@ func SearchSpansAggregate(client *godruid.Client, queryParams *model.SpanSearchA
|
||||
if queryParams.AggregationOption == "rate_per_sec" {
|
||||
value = elem.Result.Value * 1.0 / float32(queryParams.StepSeconds)
|
||||
}
|
||||
response = append(response, SpanSearchAggregatesResponseItem{
|
||||
response = append(response, model.SpanSearchAggregatesResponseItem{
|
||||
Timestamp: timestamp,
|
||||
Value: value,
|
||||
})
|
||||
@ -316,7 +321,7 @@ func SearchSpansAggregate(client *godruid.Client, queryParams *model.SpanSearchA
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func SearchSpans(client *godruid.Client, queryParams *model.SpanSearchParams) ([]godruid.ScanResult, error) {
|
||||
func SearchSpans(client *godruid.Client, queryParams *model.SpanSearchParams) (*[]model.SearchSpansResult, error) {
|
||||
|
||||
filter, err := buildFilters(queryParams)
|
||||
|
||||
@ -347,7 +352,16 @@ func SearchSpans(client *godruid.Client, queryParams *model.SpanSearchParams) ([
|
||||
|
||||
// fmt.Printf("query.QueryResult:\n%v", query.QueryResult)
|
||||
|
||||
return query.QueryResult, nil
|
||||
var searchSpansResult []model.SearchSpansResult
|
||||
searchSpansResult = make([]model.SearchSpansResult, len(query.QueryResult))
|
||||
|
||||
searchSpansResult[0].Columns = make([]string, len(query.QueryResult[0].Columns))
|
||||
copy(searchSpansResult[0].Columns, query.QueryResult[0].Columns)
|
||||
|
||||
searchSpansResult[0].Events = make([][]interface{}, len(query.QueryResult[0].Events))
|
||||
copy(searchSpansResult[0].Events, query.QueryResult[0].Events)
|
||||
|
||||
return &searchSpansResult, nil
|
||||
}
|
||||
|
||||
func GetApplicationPercentiles(client *godruid.Client, queryParams *model.ApplicationPercentileParams) ([]godruid.Timeseries, error) {
|
||||
|
@ -3,10 +3,14 @@ module go.signoz.io/query-service
|
||||
go 1.14
|
||||
|
||||
require (
|
||||
github.com/ClickHouse/clickhouse-go v1.4.5
|
||||
github.com/gogo/protobuf v1.2.1
|
||||
github.com/google/uuid v1.1.1
|
||||
github.com/gorilla/handlers v1.5.1
|
||||
github.com/gorilla/mux v1.8.0
|
||||
github.com/jaegertracing/jaeger v1.21.0
|
||||
github.com/jmoiron/sqlx v1.3.4
|
||||
github.com/opentracing/opentracing-go v1.1.0
|
||||
github.com/ory/viper v1.7.5
|
||||
github.com/posthog/posthog-go v0.0.0-20200525173953-e46dc8e6b89b
|
||||
github.com/rs/cors v1.7.0
|
||||
|
@ -2,6 +2,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
|
||||
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/ClickHouse/clickhouse-go v1.4.5 h1:FfhyEnv6/BaWldyjgT2k4gDDmeNwJ9C4NbY/MXxJlXk=
|
||||
github.com/ClickHouse/clickhouse-go v1.4.5/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI=
|
||||
github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
|
||||
github.com/DataDog/zstd v1.4.4/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
|
||||
github.com/HdrHistogram/hdrhistogram-go v0.9.0/go.mod h1:nxrse8/Tzg2tg3DZcZjm6qEclQKK70g0KxO61gFFZD4=
|
||||
@ -40,6 +42,7 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce
|
||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
|
||||
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
|
||||
github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwjwegp5jy4=
|
||||
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
|
||||
github.com/bsm/sarama-cluster v2.1.13+incompatible/go.mod h1:r7ao+4tTNXvWm+VRpRJchr2kQhqxgmAp2iEX5W96gMM=
|
||||
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
|
||||
@ -50,6 +53,7 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf
|
||||
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE=
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80=
|
||||
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
|
||||
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
|
||||
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
|
||||
@ -166,6 +170,7 @@ github.com/go-openapi/validate v0.19.2/go.mod h1:1tRCw7m3jtI8eNWEEliiAqUIcBztB2K
|
||||
github.com/go-openapi/validate v0.19.3/go.mod h1:90Vh6jjkTn+OT1Eefm0ZixWNFjhtOH7vS9k0lo6zwJo=
|
||||
github.com/go-openapi/validate v0.19.8/go.mod h1:8DJv2CVJQ6kGNpFW6eV9N3JviE1C85nY1c2z52x1Gk4=
|
||||
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
|
||||
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
|
||||
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
|
||||
github.com/gobuffalo/attrs v0.0.0-20190224210810-a9411de4debd/go.mod h1:4duuawTqi2wkkpB4ePgWMaai6/Kc6WEz83bhFwpHzj0=
|
||||
github.com/gobuffalo/depgen v0.0.0-20190329151759-d478694a28d3/go.mod h1:3STtPUQYuzV0gBVOY3vy6CfMm/ljR4pABfrTeHNLHUY=
|
||||
@ -277,8 +282,12 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt
|
||||
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
|
||||
github.com/jaegertracing/jaeger v1.21.0 h1:Fgre3vTI5E/cmkXKBXK7ksnzul5b/3gXjA3mQzt0+58=
|
||||
github.com/jaegertracing/jaeger v1.21.0/go.mod h1:PCTGGFohQBPQMR4j333V5lt6If7tj8aWJ+pQNgvZ+wU=
|
||||
github.com/jaegertracing/jaeger v1.22.0 h1:kFBhBn9XSB8V68DjD3t6qb/IUAJLLtyJ/27caGQOu7E=
|
||||
github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
|
||||
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
|
||||
github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
|
||||
github.com/jmoiron/sqlx v1.3.4 h1:wv+0IJZfL5z0uZoUjlpKgHkgaFSYD+r9CfrXjEXsO7w=
|
||||
github.com/jmoiron/sqlx v1.3.4/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ=
|
||||
github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg=
|
||||
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
|
||||
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
|
||||
@ -304,6 +313,7 @@ github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
|
||||
github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
|
||||
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
|
||||
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
|
||||
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
|
||||
@ -329,6 +339,8 @@ github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcME
|
||||
github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE=
|
||||
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
|
||||
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
|
||||
github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
|
||||
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
|
||||
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
|
||||
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
|
||||
|
@ -57,14 +57,12 @@ func main() {
|
||||
logger := loggerMgr.Sugar()
|
||||
logger.Debug("START!")
|
||||
|
||||
// v := initViper()
|
||||
|
||||
serverOptions := &app.ServerOptions{
|
||||
// HTTPHostPort: v.GetString(app.HTTPHostPort),
|
||||
// DruidClientUrl: v.GetString(app.DruidClientUrl),
|
||||
|
||||
HTTPHostPort: constants.HTTPHostPort,
|
||||
DruidClientUrl: constants.DruidClientUrl,
|
||||
HTTPHostPort: constants.HTTPHostPort,
|
||||
// DruidClientUrl: constants.DruidClientUrl,
|
||||
}
|
||||
|
||||
server, err := app.NewServer(serverOptions)
|
||||
|
@ -1,11 +1,16 @@
|
||||
package model
|
||||
|
||||
import "fmt"
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
type GetTopEndpointsParams struct {
|
||||
StartTime string
|
||||
EndTime string
|
||||
ServiceName string
|
||||
Start *time.Time
|
||||
End *time.Time
|
||||
}
|
||||
|
||||
type GetUsageParams struct {
|
||||
@ -13,17 +18,24 @@ type GetUsageParams struct {
|
||||
EndTime string
|
||||
ServiceName string
|
||||
Period string
|
||||
StepHour int
|
||||
Start *time.Time
|
||||
End *time.Time
|
||||
}
|
||||
|
||||
type GetServicesParams struct {
|
||||
StartTime string
|
||||
EndTime string
|
||||
Period int
|
||||
Start *time.Time
|
||||
End *time.Time
|
||||
}
|
||||
|
||||
type GetServiceOverviewParams struct {
|
||||
StartTime string
|
||||
EndTime string
|
||||
Start *time.Time
|
||||
End *time.Time
|
||||
ServiceName string
|
||||
Period string
|
||||
StepSeconds int
|
||||
@ -67,6 +79,8 @@ type SpanSearchParams struct {
|
||||
OperationName string
|
||||
Kind string
|
||||
Intervals string
|
||||
Start *time.Time
|
||||
End *time.Time
|
||||
MinDuration string
|
||||
MaxDuration string
|
||||
Limit int64
|
169
pkg/query-service/model/response.go
Normal file
169
pkg/query-service/model/response.go
Normal file
@ -0,0 +1,169 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ServiceItem struct {
|
||||
ServiceName string `json:"serviceName" db:"serviceName"`
|
||||
Percentile99 float32 `json:"p99" db:"p99"`
|
||||
AvgDuration float32 `json:"avgDuration" db:"avgDuration"`
|
||||
NumCalls int `json:"numCalls" db:"numCalls"`
|
||||
CallRate float32 `json:"callRate" db:"callRate"`
|
||||
NumErrors int `json:"numErrors" db:"numErrors"`
|
||||
ErrorRate float32 `json:"errorRate" db:"errorRate"`
|
||||
Num4XX int `json:"num4XX" db:"num4xx"`
|
||||
FourXXRate float32 `json:"fourXXRate" db:"fourXXRate"`
|
||||
}
|
||||
|
||||
type ServiceListErrorItem struct {
|
||||
ServiceName string `json:"serviceName"`
|
||||
NumErrors int `json:"numErrors"`
|
||||
Num4xx int `json:"num4xx"`
|
||||
}
|
||||
|
||||
type ServiceErrorItem struct {
|
||||
Time string `json:"time,omitempty" db:"time,omitempty"`
|
||||
Timestamp int64 `json:"timestamp" db:"timestamp"`
|
||||
NumErrors int `json:"numErrors" db:"numErrors"`
|
||||
}
|
||||
|
||||
type ServiceOverviewItem struct {
|
||||
Time string `json:"time,omitempty" db:"time,omitempty"`
|
||||
Timestamp int64 `json:"timestamp" db:"timestamp"`
|
||||
Percentile50 float32 `json:"p50" db:"p50"`
|
||||
Percentile95 float32 `json:"p95" db:"p95"`
|
||||
Percentile99 float32 `json:"p99" db:"p99"`
|
||||
NumCalls int `json:"numCalls" db:"numCalls"`
|
||||
CallRate float32 `json:"callRate" db:"callRate"`
|
||||
NumErrors int `json:"numErrors" db:"numErrors"`
|
||||
ErrorRate float32 `json:"errorRate" db:"errorRate"`
|
||||
}
|
||||
|
||||
type SearchSpansResult struct {
|
||||
Columns []string `json:"columns"`
|
||||
Events [][]interface{} `json:"events"`
|
||||
}
|
||||
|
||||
type TraceResult struct {
|
||||
Data []interface{} `json:"data" db:"data"`
|
||||
Total int `json:"total" db:"total"`
|
||||
Limit int `json:"limit" db:"limit"`
|
||||
Offset int `json:"offset" db:"offset"`
|
||||
}
|
||||
type TraceResultItem struct {
|
||||
TraceID string
|
||||
Spans []TraceResultSpan
|
||||
}
|
||||
type TraceResultSpan struct {
|
||||
Timestamp string `db:"timestamp"`
|
||||
SpanID string `db:"spanID"`
|
||||
TraceID string `db:"traceID"`
|
||||
ServiceName string `db:"serviceName"`
|
||||
Name string `db:"name"`
|
||||
Kind int32 `db:"kind"`
|
||||
DurationNano int64 `db:"durationNano"`
|
||||
TagsKeys []string `db:"tagsKeys"`
|
||||
TagsValues []string `db:"tagsValues"`
|
||||
}
|
||||
|
||||
type SearchSpanReponseItem struct {
|
||||
Timestamp string `db:"timestamp"`
|
||||
SpanID string `db:"spanID"`
|
||||
TraceID string `db:"traceID"`
|
||||
ServiceName string `db:"serviceName"`
|
||||
Name string `db:"name"`
|
||||
Kind int32 `db:"kind"`
|
||||
References string `db:"references,omitempty"`
|
||||
DurationNano int64 `db:"durationNano"`
|
||||
TagsKeys []string `db:"tagsKeys"`
|
||||
TagsValues []string `db:"tagsValues"`
|
||||
}
|
||||
|
||||
type OtelSpanRef struct {
|
||||
TraceId string `json:"traceId,omitempty"`
|
||||
SpanId string `json:"spanId,omitempty"`
|
||||
RefType string `json:"refType,omitempty"`
|
||||
}
|
||||
|
||||
func (ref *OtelSpanRef) toString() string {
|
||||
|
||||
retString := fmt.Sprintf(`{TraceId=%s, SpanId=%s, RefType=%s}`, ref.TraceId, ref.SpanId, ref.RefType)
|
||||
|
||||
return retString
|
||||
}
|
||||
|
||||
func (item *SearchSpanReponseItem) GetValues() []interface{} {
|
||||
|
||||
timeObj, _ := time.Parse(time.RFC3339Nano, item.Timestamp)
|
||||
references := []OtelSpanRef{}
|
||||
json.Unmarshal([]byte(item.References), &references)
|
||||
|
||||
referencesStringArray := []string{}
|
||||
for _, item := range references {
|
||||
referencesStringArray = append(referencesStringArray, item.toString())
|
||||
}
|
||||
|
||||
returnArray := []interface{}{int64(timeObj.UnixNano() / 1000000), item.SpanID, item.TraceID, item.ServiceName, item.Name, strconv.Itoa(int(item.Kind)), strconv.FormatInt(item.DurationNano, 10), item.TagsKeys, item.TagsValues, referencesStringArray}
|
||||
|
||||
return returnArray
|
||||
}
|
||||
|
||||
type ServiceExternalItem struct {
|
||||
Time string `json:"time,omitempty" db:"time,omitempty"`
|
||||
Timestamp int64 `json:"timestamp,omitempty" db:"timestamp,omitempty"`
|
||||
ExternalHttpUrl string `json:"externalHttpUrl,omitempty" db:"externalHttpUrl,omitempty"`
|
||||
AvgDuration float32 `json:"avgDuration,omitempty" db:"avgDuration,omitempty"`
|
||||
NumCalls int `json:"numCalls,omitempty" db:"numCalls,omitempty"`
|
||||
CallRate float32 `json:"callRate,omitempty" db:"callRate,omitempty"`
|
||||
NumErrors int `json:"numErrors" db:"numErrors"`
|
||||
ErrorRate float32 `json:"errorRate" db:"errorRate"`
|
||||
}
|
||||
|
||||
type ServiceDBOverviewItem struct {
|
||||
Time string `json:"time,omitempty" db:"time,omitempty"`
|
||||
Timestamp int64 `json:"timestamp,omitempty" db:"timestamp,omitempty"`
|
||||
DBSystem string `json:"dbSystem,omitempty" db:"dbSystem,omitempty"`
|
||||
AvgDuration float32 `json:"avgDuration,omitempty" db:"avgDuration,omitempty"`
|
||||
NumCalls int `json:"numCalls,omitempty" db:"numCalls,omitempty"`
|
||||
CallRate float32 `json:"callRate,omitempty" db:"callRate,omitempty"`
|
||||
}
|
||||
|
||||
type ServiceMapDependencyItem struct {
|
||||
SpanId string `json:"spanId,omitempty" db:"spanID,omitempty"`
|
||||
ParentSpanId string `json:"parentSpanId,omitempty" db:"parentSpanID,omitempty"`
|
||||
ServiceName string `json:"serviceName,omitempty" db:"serviceName,omitempty"`
|
||||
}
|
||||
|
||||
type UsageItem struct {
|
||||
Time string `json:"time,omitempty" db:"time,omitempty"`
|
||||
Timestamp int64 `json:"timestamp" db:"timestamp"`
|
||||
Count int64 `json:"count" db:"count"`
|
||||
}
|
||||
|
||||
type TopEndpointsItem struct {
|
||||
Percentile50 float32 `json:"p50" db:"p50"`
|
||||
Percentile95 float32 `json:"p95" db:"p95"`
|
||||
Percentile99 float32 `json:"p99" db:"p99"`
|
||||
NumCalls int `json:"numCalls" db:"numCalls"`
|
||||
Name string `json:"name" db:"name"`
|
||||
}
|
||||
|
||||
type TagItem struct {
|
||||
TagKeys string `json:"tagKeys" db:"tagKeys"`
|
||||
TagCount int `json:"tagCount" db:"tagCount"`
|
||||
}
|
||||
|
||||
type ServiceMapDependencyResponseItem struct {
|
||||
Parent string `json:"parent,omitempty" db:"parent,omitempty"`
|
||||
Child string `json:"child,omitempty" db:"child,omitempty"`
|
||||
CallCount int `json:"callCount,omitempty" db:"callCount,omitempty"`
|
||||
}
|
||||
|
||||
type SpanSearchAggregatesResponseItem struct {
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Value float32 `json:"value"`
|
||||
}
|
20
pkg/query-service/test/clickhouse/clickhouse.go
Normal file
20
pkg/query-service/test/clickhouse/clickhouse.go
Normal file
@ -0,0 +1,20 @@
|
||||
package clickhouse
|
||||
|
||||
type ClickHouseReader struct {
|
||||
ClickHouseClientUrl string
|
||||
}
|
||||
|
||||
func connect() string {
|
||||
return "Connected to ClickHouse"
|
||||
}
|
||||
|
||||
func NewSpanReader() *ClickHouseReader {
|
||||
connect()
|
||||
return &ClickHouseReader{
|
||||
ClickHouseClientUrl: "http://localhost:9000",
|
||||
}
|
||||
}
|
||||
|
||||
func (chReader *ClickHouseReader) GetServices() string {
|
||||
return "Hello from ClickHouse"
|
||||
}
|
20
pkg/query-service/test/druid/druid.go
Normal file
20
pkg/query-service/test/druid/druid.go
Normal file
@ -0,0 +1,20 @@
|
||||
package druid
|
||||
|
||||
type DruidReader struct {
|
||||
DruidClientUrl string
|
||||
}
|
||||
|
||||
func connect() string {
|
||||
return "Connected to Druid"
|
||||
}
|
||||
|
||||
func NewSpanReader() *DruidReader {
|
||||
connect()
|
||||
return &DruidReader{
|
||||
DruidClientUrl: "http://localhost:8888",
|
||||
}
|
||||
}
|
||||
|
||||
func (druidReader *DruidReader) GetServices() string {
|
||||
return "Hello from Druid"
|
||||
}
|
27
pkg/query-service/test/test.go
Normal file
27
pkg/query-service/test/test.go
Normal file
@ -0,0 +1,27 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"go.signoz.io/query-service/test/clickhouse"
|
||||
"go.signoz.io/query-service/test/druid"
|
||||
)
|
||||
|
||||
type StorageReader interface {
|
||||
GetServices() string
|
||||
}
|
||||
|
||||
func main() {
|
||||
storage := os.Getenv("STORAGE")
|
||||
var client StorageReader
|
||||
|
||||
if storage == "druid" {
|
||||
client = druid.NewSpanReader()
|
||||
} else if storage == "clickhouse" {
|
||||
client = clickhouse.NewSpanReader()
|
||||
}
|
||||
|
||||
services := client.GetServices()
|
||||
fmt.Println(services)
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user