diff --git a/node_modules/.yarn-integrity b/node_modules/.yarn-integrity new file mode 100644 index 0000000000..0f19eb7ab3 --- /dev/null +++ b/node_modules/.yarn-integrity @@ -0,0 +1,10 @@ +{ + "systemParams": "darwin-x64-83", + "modulesFolders": [], + "flags": [], + "linkedModules": [], + "topLevelPatterns": [], + "lockfileEntries": {}, + "files": [], + "artifacts": {} +} \ No newline at end of file diff --git a/pkg/query-service/__debug_bin b/pkg/query-service/__debug_bin deleted file mode 100755 index 2f81072b62..0000000000 Binary files a/pkg/query-service/__debug_bin and /dev/null differ diff --git a/pkg/query-service/app/clickhouseReader/options.go b/pkg/query-service/app/clickhouseReader/options.go new file mode 100644 index 0000000000..abf66c852d --- /dev/null +++ b/pkg/query-service/app/clickhouseReader/options.go @@ -0,0 +1,124 @@ +package clickhouseReader + +import ( + "time" + + "github.com/jmoiron/sqlx" +) + +type Encoding string + +const ( + // EncodingJSON is used for spans encoded as JSON. + EncodingJSON Encoding = "json" + // EncodingProto is used for spans encoded as Protobuf. + EncodingProto Encoding = "protobuf" +) + +const ( + defaultDatasource string = "tcp://localhost:9000" + defaultOperationsTable string = "signoz_operations" + defaultIndexTable string = "signoz_index" + defaultSpansTable string = "signoz_spans" + defaultArchiveSpansTable string = "signoz_archive_spans" + defaultWriteBatchDelay time.Duration = 5 * time.Second + defaultWriteBatchSize int = 10000 + defaultEncoding Encoding = EncodingJSON +) + +const ( + suffixEnabled = ".enabled" + suffixDatasource = ".datasource" + suffixOperationsTable = ".operations-table" + suffixIndexTable = ".index-table" + suffixSpansTable = ".spans-table" + suffixWriteBatchDelay = ".write-batch-delay" + suffixWriteBatchSize = ".write-batch-size" + suffixEncoding = ".encoding" +) + +// NamespaceConfig is Clickhouse's internal configuration data +type namespaceConfig struct { + namespace string + Enabled bool + Datasource string + OperationsTable string + IndexTable string + SpansTable string + WriteBatchDelay time.Duration + WriteBatchSize int + Encoding Encoding + Connector Connector +} + +// Connecto defines how to connect to the database +type Connector func(cfg *namespaceConfig) (*sqlx.DB, error) + +func defaultConnector(cfg *namespaceConfig) (*sqlx.DB, error) { + db, err := sqlx.Open("clickhouse", cfg.Datasource) + if err != nil { + return nil, err + } + + if err := db.Ping(); err != nil { + return nil, err + } + + return db, nil +} + +// Options store storage plugin related configs +type Options struct { + primary *namespaceConfig + + others map[string]*namespaceConfig +} + +// NewOptions creates a new Options struct. +func NewOptions(datasource string, primaryNamespace string, otherNamespaces ...string) *Options { + + if datasource == "" { + datasource = defaultDatasource + } + + options := &Options{ + primary: &namespaceConfig{ + namespace: primaryNamespace, + Enabled: true, + Datasource: datasource, + OperationsTable: defaultOperationsTable, + IndexTable: defaultIndexTable, + SpansTable: defaultSpansTable, + WriteBatchDelay: defaultWriteBatchDelay, + WriteBatchSize: defaultWriteBatchSize, + Encoding: defaultEncoding, + Connector: defaultConnector, + }, + others: make(map[string]*namespaceConfig, len(otherNamespaces)), + } + + for _, namespace := range otherNamespaces { + if namespace == archiveNamespace { + options.others[namespace] = &namespaceConfig{ + namespace: namespace, + Datasource: datasource, + OperationsTable: "", + IndexTable: "", + SpansTable: defaultArchiveSpansTable, + WriteBatchDelay: defaultWriteBatchDelay, + WriteBatchSize: defaultWriteBatchSize, + Encoding: defaultEncoding, + Connector: defaultConnector, + } + } else { + options.others[namespace] = &namespaceConfig{namespace: namespace} + } + } + + return options +} + +// GetPrimary returns the primary namespace configuration +func (opt *Options) getPrimary() *namespaceConfig { + return opt.primary +} diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go new file mode 100644 index 0000000000..84c5e6e079 --- /dev/null +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -0,0 +1,619 @@ +package clickhouseReader + +import ( + "context" + "errors" + "fmt" + "os" + "strconv" + "time" + + _ "github.com/ClickHouse/clickhouse-go" + "github.com/jmoiron/sqlx" + + "go.signoz.io/query-service/model" + "go.uber.org/zap" +) + +const ( + primaryNamespace = "clickhouse" + archiveNamespace = "clickhouse-archive" + + minTimespanForProgressiveSearch = time.Hour + minTimespanForProgressiveSearchMargin = time.Minute + maxProgressiveSteps = 4 +) + +var ( + ErrNoOperationsTable = errors.New("no operations table supplied") + ErrNoIndexTable = errors.New("no index table supplied") + ErrStartTimeRequired = errors.New("start time is required for search queries") +) + +// SpanWriter for reading spans from ClickHouse +type ClickHouseReader struct { + db *sqlx.DB + operationsTable string + indexTable string + spansTable string +} + +// NewTraceReader returns a TraceReader for the database +func NewReader() *ClickHouseReader { + + datasource := os.Getenv("ClickHouseUrl") + options := NewOptions(datasource, primaryNamespace, archiveNamespace) + db, err := initialize(options) + + if err != nil { + zap.S().Error(err) + } + return &ClickHouseReader{ + db: db, + operationsTable: options.primary.OperationsTable, + indexTable: options.primary.IndexTable, + spansTable: options.primary.SpansTable, + } +} + +func initialize(options *Options) (*sqlx.DB, error) { + + db, err := connect(options.getPrimary()) + if err != nil { + return nil, fmt.Errorf("error connecting to primary db: %v", err) + } + + return db, nil +} + +func connect(cfg *namespaceConfig) (*sqlx.DB, error) { + if cfg.Encoding != EncodingJSON && cfg.Encoding != EncodingProto { + return nil, fmt.Errorf("unknown encoding %q, supported: %q, %q", cfg.Encoding, EncodingJSON, EncodingProto) + } + + return cfg.Connector(cfg) +} + +func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, error) { + + if r.indexTable == "" { + return nil, ErrNoIndexTable + } + + serviceItems := []model.ServiceItem{} + + query := fmt.Sprintf("SELECT serviceName, quantile(0.99)(durationNano) as p99, avg(durationNano) as avgDuration, count(*) as numCalls FROM %s WHERE timestamp>='%s' AND timestamp<='%s' AND kind='2' GROUP BY serviceName ORDER BY p99 DESC", r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)) + + err := r.db.Select(&serviceItems, query) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + + ////////////////// Below block gets 5xx of services + serviceErrorItems := []model.ServiceItem{} + + query = fmt.Sprintf("SELECT serviceName, count(*) as numErrors FROM %s WHERE timestamp>='%s' AND timestamp<='%s' AND kind='2' AND statusCode>=500 GROUP BY serviceName", r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)) + + err = r.db.Select(&serviceErrorItems, query) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + + m5xx := make(map[string]int) + + for j, _ := range serviceErrorItems { + m5xx[serviceErrorItems[j].ServiceName] = serviceErrorItems[j].NumErrors + } + /////////////////////////////////////////// + + ////////////////// Below block gets 4xx of services + + service4xxItems := []model.ServiceItem{} + + query = fmt.Sprintf("SELECT serviceName, count(*) as num4xx FROM %s WHERE timestamp>='%s' AND timestamp<='%s' AND kind='2' AND statusCode>=400 AND statusCode<500 GROUP BY serviceName", r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)) + + err = r.db.Select(&service4xxItems, query) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + + m4xx := make(map[string]int) + + for j, _ := range service4xxItems { + m5xx[service4xxItems[j].ServiceName] = service4xxItems[j].Num4XX + } + + for i, _ := range serviceItems { + if val, ok := m5xx[serviceItems[i].ServiceName]; ok { + serviceItems[i].NumErrors = val + } + if val, ok := m4xx[serviceItems[i].ServiceName]; ok { + serviceItems[i].Num4XX = val + } + serviceItems[i].CallRate = float32(serviceItems[i].NumCalls) / float32(queryParams.Period) + serviceItems[i].FourXXRate = float32(serviceItems[i].Num4XX) / float32(queryParams.Period) + serviceItems[i].ErrorRate = float32(serviceItems[i].NumErrors) / float32(queryParams.Period) + } + + return &serviceItems, nil +} + +func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceOverviewItem, error) { + + serviceOverviewItems := []model.ServiceOverviewItem{} + + query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, quantile(0.99)(durationNano) as p99, quantile(0.95)(durationNano) as p95,quantile(0.50)(durationNano) as p50, count(*) as numCalls FROM %s WHERE timestamp>='%s' AND timestamp<='%s' AND kind='2' AND serviceName='%s' GROUP BY time ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10), queryParams.ServiceName) + + err := r.db.Select(&serviceOverviewItems, query) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + + serviceErrorItems := []model.ServiceErrorItem{} + + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, count(*) as numErrors FROM %s WHERE timestamp>='%s' AND timestamp<='%s' AND kind='2' AND serviceName='%s' AND statusCode>=500 GROUP BY time ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10), queryParams.ServiceName) + + err = r.db.Select(&serviceErrorItems, query) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + + m := make(map[int64]int) + + for j, _ := range serviceErrorItems { + timeObj, _ := time.Parse(time.RFC3339Nano, serviceErrorItems[j].Time) + m[int64(timeObj.UnixNano())] = serviceErrorItems[j].NumErrors + } + + for i, _ := range serviceOverviewItems { + timeObj, _ := time.Parse(time.RFC3339Nano, serviceOverviewItems[i].Time) + serviceOverviewItems[i].Timestamp = int64(timeObj.UnixNano()) + serviceOverviewItems[i].Time = "" + + if val, ok := m[serviceOverviewItems[i].Timestamp]; ok { + serviceOverviewItems[i].NumErrors = val + } + serviceOverviewItems[i].ErrorRate = float32(serviceOverviewItems[i].NumErrors) * 100 / float32(serviceOverviewItems[i].NumCalls) + serviceOverviewItems[i].CallRate = float32(serviceOverviewItems[i].NumCalls) / float32(queryParams.StepSeconds) + } + + return &serviceOverviewItems, nil + +} + +func (r *ClickHouseReader) SearchSpans(ctx context.Context, queryParams *model.SpanSearchParams) (*[]model.SearchSpansResult, error) { + + query := fmt.Sprintf("SELECT timestamp, spanID, traceID, serviceName, name, kind, durationNano, tagsKeys, tagsValues FROM %s WHERE timestamp >= ? AND timestamp <= ?", r.indexTable) + + args := []interface{}{strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)} + + if len(queryParams.ServiceName) != 0 { + query = query + " AND serviceName = ?" + args = append(args, queryParams.ServiceName) + } + + if len(queryParams.OperationName) != 0 { + + query = query + " AND name = ?" + args = append(args, queryParams.OperationName) + + } + + if len(queryParams.Kind) != 0 { + query = query + " AND kind = ?" + args = append(args, queryParams.Kind) + + } + + if len(queryParams.MinDuration) != 0 { + query = query + " AND durationNano >= ?" + args = append(args, queryParams.MinDuration) + } + if len(queryParams.MaxDuration) != 0 { + query = query + " AND durationNano <= ?" + args = append(args, queryParams.MaxDuration) + } + + for _, item := range queryParams.Tags { + + if item.Key == "error" && item.Value == "true" { + query = query + " AND ( has(tags, 'error:true') OR statusCode>=500)" + continue + } + + if item.Operator == "equals" { + query = query + " AND has(tags, ?)" + args = append(args, fmt.Sprintf("%s:%s", item.Key, item.Value)) + + } else if item.Operator == "contains" { + query = query + " AND tagsValues[indexOf(tagsKeys, ?)] ILIKE ?" + args = append(args, item.Key) + args = append(args, fmt.Sprintf("%%%s%%", item.Value)) + } else if item.Operator == "isnotnull" { + query = query + " AND has(tagsKeys, ?)" + args = append(args, item.Key) + } else { + return nil, fmt.Errorf("Tag Operator %s not supported", item.Operator) + } + + } + + query = query + " ORDER BY timestamp DESC LIMIT 100" + + var searchScanReponses []model.SearchSpanReponseItem + + err := r.db.Select(&searchScanReponses, query, args...) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + + searchSpansResult := []model.SearchSpansResult{ + model.SearchSpansResult{ + Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues"}, + Events: make([][]interface{}, len(searchScanReponses)), + }, + } + + for i, item := range searchScanReponses { + spanEvents := item.GetValues() + searchSpansResult[0].Events[i] = spanEvents + } + + return &searchSpansResult, nil +} + +func (r *ClickHouseReader) GetServiceDBOverview(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceDBOverviewItem, error) { + + var serviceDBOverviewItems []model.ServiceDBOverviewItem + + query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, avg(durationNano) as avgDuration, count(1) as numCalls, dbSystem FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' AND kind='3' AND dbName IS NOT NULL GROUP BY time, dbSystem ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)) + + err := r.db.Select(&serviceDBOverviewItems, query) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + + for i, _ := range serviceDBOverviewItems { + timeObj, _ := time.Parse(time.RFC3339Nano, serviceDBOverviewItems[i].Time) + serviceDBOverviewItems[i].Timestamp = int64(timeObj.UnixNano()) + serviceDBOverviewItems[i].Time = "" + serviceDBOverviewItems[i].CallRate = float32(serviceDBOverviewItems[i].NumCalls) / float32(queryParams.StepSeconds) + } + + if serviceDBOverviewItems == nil { + serviceDBOverviewItems = []model.ServiceDBOverviewItem{} + } + + return &serviceDBOverviewItems, nil + +} + +func (r *ClickHouseReader) GetServiceExternalAvgDuration(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) { + + var serviceExternalItems []model.ServiceExternalItem + + query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, avg(durationNano) as avgDuration FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' AND kind='3' AND externalHttpUrl IS NOT NULL GROUP BY time ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)) + + err := r.db.Select(&serviceExternalItems, query) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + + for i, _ := range serviceExternalItems { + timeObj, _ := time.Parse(time.RFC3339Nano, serviceExternalItems[i].Time) + serviceExternalItems[i].Timestamp = int64(timeObj.UnixNano()) + serviceExternalItems[i].Time = "" + serviceExternalItems[i].CallRate = float32(serviceExternalItems[i].NumCalls) / float32(queryParams.StepSeconds) + } + + if serviceExternalItems == nil { + serviceExternalItems = []model.ServiceExternalItem{} + } + + return &serviceExternalItems, nil +} + +func (r *ClickHouseReader) GetServiceExternalErrors(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) { + + var serviceExternalErrorItems []model.ServiceExternalItem + + query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, avg(durationNano) as avgDuration, count(1) as numCalls, externalHttpUrl FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' AND kind='3' AND externalHttpUrl IS NOT NULL AND statusCode >= 500 GROUP BY time, externalHttpUrl ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)) + + err := r.db.Select(&serviceExternalErrorItems, query) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + var serviceExternalTotalItems []model.ServiceExternalItem + + queryTotal := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, avg(durationNano) as avgDuration, count(1) as numCalls, externalHttpUrl FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' AND kind='3' AND externalHttpUrl IS NOT NULL GROUP BY time, externalHttpUrl ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)) + + errTotal := r.db.Select(&serviceExternalTotalItems, queryTotal) + + if errTotal != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + + m := make(map[string]int) + + for j, _ := range serviceExternalErrorItems { + timeObj, _ := time.Parse(time.RFC3339Nano, serviceExternalErrorItems[j].Time) + m[strconv.FormatInt(timeObj.UnixNano(), 10)+"-"+serviceExternalErrorItems[j].ExternalHttpUrl] = serviceExternalErrorItems[j].NumCalls + } + + for i, _ := range serviceExternalTotalItems { + timeObj, _ := time.Parse(time.RFC3339Nano, serviceExternalTotalItems[i].Time) + serviceExternalTotalItems[i].Timestamp = int64(timeObj.UnixNano()) + serviceExternalTotalItems[i].Time = "" + // serviceExternalTotalItems[i].CallRate = float32(serviceExternalTotalItems[i].NumCalls) / float32(queryParams.StepSeconds) + + if val, ok := m[strconv.FormatInt(serviceExternalTotalItems[i].Timestamp, 10)+"-"+serviceExternalTotalItems[i].ExternalHttpUrl]; ok { + serviceExternalTotalItems[i].NumErrors = val + serviceExternalTotalItems[i].ErrorRate = float32(serviceExternalTotalItems[i].NumErrors) * 100 / float32(serviceExternalTotalItems[i].NumCalls) + } + serviceExternalTotalItems[i].CallRate = 0 + serviceExternalTotalItems[i].NumCalls = 0 + + } + + if serviceExternalTotalItems == nil { + serviceExternalTotalItems = []model.ServiceExternalItem{} + } + + return &serviceExternalTotalItems, nil +} + +func (r *ClickHouseReader) GetServiceExternal(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) { + + var serviceExternalItems []model.ServiceExternalItem + + query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, avg(durationNano) as avgDuration, count(1) as numCalls, externalHttpUrl FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' AND kind='3' AND externalHttpUrl IS NOT NULL GROUP BY time, externalHttpUrl ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)) + + err := r.db.Select(&serviceExternalItems, query) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + + for i, _ := range serviceExternalItems { + timeObj, _ := time.Parse(time.RFC3339Nano, serviceExternalItems[i].Time) + serviceExternalItems[i].Timestamp = int64(timeObj.UnixNano()) + serviceExternalItems[i].Time = "" + serviceExternalItems[i].CallRate = float32(serviceExternalItems[i].NumCalls) / float32(queryParams.StepSeconds) + } + + if serviceExternalItems == nil { + serviceExternalItems = []model.ServiceExternalItem{} + } + + return &serviceExternalItems, nil +} + +func (r *ClickHouseReader) GetTopEndpoints(ctx context.Context, queryParams *model.GetTopEndpointsParams) (*[]model.TopEndpointsItem, error) { + + var topEndpointsItems []model.TopEndpointsItem + + query := fmt.Sprintf("SELECT quantile(0.5)(durationNano) as p50, quantile(0.95)(durationNano) as p95, quantile(0.99)(durationNano) as p99, COUNT(1) as numCalls, name FROM %s WHERE timestamp >= '%s' AND timestamp <= '%s' AND kind='2' and serviceName='%s' GROUP BY name", r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10), queryParams.ServiceName) + + err := r.db.Select(&topEndpointsItems, query) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + + if topEndpointsItems == nil { + topEndpointsItems = []model.TopEndpointsItem{} + } + + return &topEndpointsItems, nil +} + +func (r *ClickHouseReader) GetUsage(ctx context.Context, queryParams *model.GetUsageParams) (*[]model.UsageItem, error) { + + var usageItems []model.UsageItem + + var query string + if len(queryParams.ServiceName) != 0 { + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d HOUR) as time, count(1) as count FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' GROUP BY time ORDER BY time ASC", queryParams.StepHour, r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)) + } else { + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d HOUR) as time, count(1) as count FROM %s WHERE timestamp>='%s' AND timestamp<='%s' GROUP BY time ORDER BY time ASC", queryParams.StepHour, r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)) + } + + err := r.db.Select(&usageItems, query) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + + for i, _ := range usageItems { + timeObj, _ := time.Parse(time.RFC3339Nano, usageItems[i].Time) + usageItems[i].Timestamp = int64(timeObj.UnixNano()) + usageItems[i].Time = "" + } + + if usageItems == nil { + usageItems = []model.UsageItem{} + } + + return &usageItems, nil +} + +func (r *ClickHouseReader) GetServicesList(ctx context.Context) (*[]string, error) { + + services := []string{} + + query := fmt.Sprintf(`SELECT DISTINCT serviceName FROM %s WHERE toDate(timestamp) > now() - INTERVAL 1 DAY`, r.indexTable) + + err := r.db.Select(&services, query) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + + return &services, nil +} + +func (r *ClickHouseReader) GetTags(ctx context.Context, serviceName string) (*[]model.TagItem, error) { + + tagItems := []model.TagItem{} + + query := fmt.Sprintf(`SELECT DISTINCT arrayJoin(tagsKeys) as tagKeys FROM %s WHERE serviceName='%s' AND toDate(timestamp) > now() - INTERVAL 1 DAY`, r.indexTable, serviceName) + + err := r.db.Select(&tagItems, query) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + + return &tagItems, nil +} + +func (r *ClickHouseReader) GetOperations(ctx context.Context, serviceName string) (*[]string, error) { + + operations := []string{} + + query := fmt.Sprintf(`SELECT DISTINCT(name) FROM %s WHERE serviceName='%s' AND toDate(timestamp) > now() - INTERVAL 1 DAY`, r.indexTable, serviceName) + + err := r.db.Select(&operations, query) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + return &operations, nil +} + +func (r *ClickHouseReader) SearchTraces(ctx context.Context, traceId string) (*[]model.SearchSpansResult, error) { + + var searchScanReponses []model.SearchSpanReponseItem + + query := fmt.Sprintf("SELECT timestamp, spanID, traceID, serviceName, name, kind, durationNano, tagsKeys, tagsValues, references FROM %s WHERE traceID='%s'", r.indexTable, traceId) + + err := r.db.Select(&searchScanReponses, query) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + + searchSpansResult := []model.SearchSpansResult{ + model.SearchSpansResult{ + Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References"}, + Events: make([][]interface{}, len(searchScanReponses)), + }, + } + + for i, item := range searchScanReponses { + spanEvents := item.GetValues() + searchSpansResult[0].Events[i] = spanEvents + } + + return &searchSpansResult, nil + +} +func (r *ClickHouseReader) GetServiceMapDependencies(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) { + serviceMapDependencyItems := []model.ServiceMapDependencyItem{} + + query := fmt.Sprintf(`SELECT spanID, parentSpanID, serviceName FROM %s WHERE timestamp>='%s' AND timestamp<='%s'`, r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)) + + err := r.db.Select(&serviceMapDependencyItems, query) + + zap.S().Info(query) + + if err != nil { + zap.S().Debug("Error in processing sql query: ", err) + return nil, fmt.Errorf("Error in processing sql query") + } + + serviceMap := make(map[string]*model.ServiceMapDependencyResponseItem) + + spanId2ServiceNameMap := make(map[string]string) + for i, _ := range serviceMapDependencyItems { + spanId2ServiceNameMap[serviceMapDependencyItems[i].SpanId] = serviceMapDependencyItems[i].ServiceName + } + for i, _ := range serviceMapDependencyItems { + parent2childServiceName := spanId2ServiceNameMap[serviceMapDependencyItems[i].ParentSpanId] + "-" + spanId2ServiceNameMap[serviceMapDependencyItems[i].SpanId] + if _, ok := serviceMap[parent2childServiceName]; !ok { + serviceMap[parent2childServiceName] = &model.ServiceMapDependencyResponseItem{ + Parent: spanId2ServiceNameMap[serviceMapDependencyItems[i].ParentSpanId], + Child: spanId2ServiceNameMap[serviceMapDependencyItems[i].SpanId], + CallCount: 1, + } + } else { + serviceMap[parent2childServiceName].CallCount++ + } + } + + retMe := make([]model.ServiceMapDependencyResponseItem, 0, len(serviceMap)) + for _, dependency := range serviceMap { + if dependency.Parent == "" { + continue + } + retMe = append(retMe, *dependency) + } + + return &retMe, nil +} + +func (r *ClickHouseReader) SearchSpansAggregate(ctx context.Context, queryParams *model.SpanSearchAggregatesParams) ([]model.SpanSearchAggregatesResponseItem, error) { + + spanSearchAggregatesResponseItems := []model.SpanSearchAggregatesResponseItem{} + + return spanSearchAggregatesResponseItems, nil + +} diff --git a/pkg/query-service/app/druidReader/reader.go b/pkg/query-service/app/druidReader/reader.go new file mode 100644 index 0000000000..e7124e9f48 --- /dev/null +++ b/pkg/query-service/app/druidReader/reader.go @@ -0,0 +1,99 @@ +package druidReader + +import ( + "context" + "os" + + "go.signoz.io/query-service/druidQuery" + "go.signoz.io/query-service/godruid" + "go.signoz.io/query-service/model" +) + +type DruidReader struct { + Client *godruid.Client + SqlClient *druidQuery.SqlClient +} + +func NewReader() *DruidReader { + + initialize() + druidClientUrl := os.Getenv("DruidClientUrl") + + client := godruid.Client{ + Url: druidClientUrl, + Debug: true, + } + + sqlClient := druidQuery.SqlClient{ + Url: druidClientUrl, + Debug: true, + } + return &DruidReader{ + Client: &client, + SqlClient: &sqlClient, + } + +} + +func initialize() { + +} + +func (druid *DruidReader) GetServiceOverview(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceOverviewItem, error) { + return druidQuery.GetServiceOverview(druid.SqlClient, query) +} + +func (druid *DruidReader) GetServices(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceItem, error) { + return druidQuery.GetServices(druid.SqlClient, query) +} + +func (druid *DruidReader) SearchSpans(ctx context.Context, query *model.SpanSearchParams) (*[]model.SearchSpansResult, error) { + return druidQuery.SearchSpans(druid.Client, query) +} + +func (druid *DruidReader) GetServiceDBOverview(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceDBOverviewItem, error) { + return druidQuery.GetServiceDBOverview(druid.SqlClient, query) +} + +func (druid *DruidReader) GetServiceExternalAvgDuration(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) { + return druidQuery.GetServiceExternalAvgDuration(druid.SqlClient, query) +} + +func (druid *DruidReader) GetServiceExternalErrors(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) { + return druidQuery.GetServiceExternalErrors(druid.SqlClient, query) +} + +func (druid *DruidReader) GetServiceExternal(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) { + return druidQuery.GetServiceExternal(druid.SqlClient, query) +} + +func (druid *DruidReader) GetTopEndpoints(ctx context.Context, query *model.GetTopEndpointsParams) (*[]model.TopEndpointsItem, error) { + return druidQuery.GetTopEndpoints(druid.SqlClient, query) +} + +func (druid *DruidReader) GetUsage(ctx context.Context, query *model.GetUsageParams) (*[]model.UsageItem, error) { + return druidQuery.GetUsage(druid.SqlClient, query) +} + +func (druid *DruidReader) GetOperations(ctx context.Context, serviceName string) (*[]string, error) { + return druidQuery.GetOperations(druid.SqlClient, serviceName) +} + +func (druid *DruidReader) GetTags(ctx context.Context, serviceName string) (*[]model.TagItem, error) { + return druidQuery.GetTags(druid.SqlClient, serviceName) +} + +func (druid *DruidReader) GetServicesList(ctx context.Context) (*[]string, error) { + return druidQuery.GetServicesList(druid.SqlClient) +} + +func (druid *DruidReader) SearchTraces(ctx context.Context, traceId string) (*[]model.SearchSpansResult, error) { + return druidQuery.SearchTraces(druid.Client, traceId) +} + +func (druid *DruidReader) GetServiceMapDependencies(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) { + return druidQuery.GetServiceMapDependencies(druid.SqlClient, query) +} +func (druid *DruidReader) SearchSpansAggregate(ctx context.Context, queryParams *model.SpanSearchAggregatesParams) ([]model.SpanSearchAggregatesResponseItem, error) { + return druidQuery.SearchSpansAggregate(druid.Client, queryParams) +} diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 35798d374e..7d0ec7bbbc 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -1,14 +1,13 @@ package app import ( + "context" "encoding/json" "fmt" "net/http" "github.com/gorilla/mux" "github.com/posthog/posthog-go" - "go.signoz.io/query-service/druidQuery" - "go.signoz.io/query-service/godruid" "go.uber.org/zap" ) @@ -23,17 +22,15 @@ type APIHandler struct { // queryParser queryParser basePath string apiPrefix string - client *godruid.Client - sqlClient *druidQuery.SqlClient + reader *Reader pc *posthog.Client distinctId string } // NewAPIHandler returns an APIHandler -func NewAPIHandler(client *godruid.Client, sqlClient *druidQuery.SqlClient, pc *posthog.Client, distinctId string) *APIHandler { +func NewAPIHandler(reader *Reader, pc *posthog.Client, distinctId string) *APIHandler { aH := &APIHandler{ - client: client, - sqlClient: sqlClient, + reader: reader, pc: pc, distinctId: distinctId, } @@ -59,7 +56,7 @@ type structuredError struct { func (aH *APIHandler) RegisterRoutes(router *mux.Router) { router.HandleFunc("/api/v1/user", aH.user).Methods(http.MethodPost) - router.HandleFunc("/api/v1/get_percentiles", aH.getApplicationPercentiles).Methods(http.MethodGet) + // router.HandleFunc("/api/v1/get_percentiles", aH.getApplicationPercentiles).Methods(http.MethodGet) router.HandleFunc("/api/v1/services", aH.getServices).Methods(http.MethodGet) router.HandleFunc("/api/v1/services/list", aH.getServicesList).Methods(http.MethodGet) router.HandleFunc("/api/v1/service/overview", aH.getServiceOverview).Methods(http.MethodGet) @@ -115,7 +112,7 @@ func (aH *APIHandler) getOperations(w http.ResponseWriter, r *http.Request) { return } - result, err := druidQuery.GetOperations(aH.sqlClient, serviceName) + result, err := (*aH.reader).GetOperations(context.Background(), serviceName) if aH.handleError(w, err, http.StatusBadRequest) { return } @@ -126,7 +123,7 @@ func (aH *APIHandler) getOperations(w http.ResponseWriter, r *http.Request) { func (aH *APIHandler) getServicesList(w http.ResponseWriter, r *http.Request) { - result, err := druidQuery.GetServicesList(aH.sqlClient) + result, err := (*aH.reader).GetServicesList(context.Background()) if aH.handleError(w, err, http.StatusBadRequest) { return } @@ -139,7 +136,7 @@ func (aH *APIHandler) searchTags(w http.ResponseWriter, r *http.Request) { serviceName := r.URL.Query().Get("service") - result, err := druidQuery.GetTags(aH.sqlClient, serviceName) + result, err := (*aH.reader).GetTags(context.Background(), serviceName) if aH.handleError(w, err, http.StatusBadRequest) { return } @@ -155,7 +152,8 @@ func (aH *APIHandler) getTopEndpoints(w http.ResponseWriter, r *http.Request) { return } - result, err := druidQuery.GetTopEndpoints(aH.sqlClient, query) + result, err := (*aH.reader).GetTopEndpoints(context.Background(), query) + if aH.handleError(w, err, http.StatusBadRequest) { return } @@ -171,7 +169,7 @@ func (aH *APIHandler) getUsage(w http.ResponseWriter, r *http.Request) { return } - result, err := druidQuery.GetUsage(aH.sqlClient, query) + result, err := (*aH.reader).GetUsage(context.Background(), query) if aH.handleError(w, err, http.StatusBadRequest) { return } @@ -187,7 +185,8 @@ func (aH *APIHandler) getServiceDBOverview(w http.ResponseWriter, r *http.Reques return } - result, err := druidQuery.GetServiceDBOverview(aH.sqlClient, query) + result, err := (*aH.reader).GetServiceDBOverview(context.Background(), query) + if aH.handleError(w, err, http.StatusBadRequest) { return } @@ -203,7 +202,7 @@ func (aH *APIHandler) getServiceExternal(w http.ResponseWriter, r *http.Request) return } - result, err := druidQuery.GetServiceExternal(aH.sqlClient, query) + result, err := (*aH.reader).GetServiceExternal(context.Background(), query) if aH.handleError(w, err, http.StatusBadRequest) { return } @@ -219,7 +218,7 @@ func (aH *APIHandler) GetServiceExternalAvgDuration(w http.ResponseWriter, r *ht return } - result, err := druidQuery.GetServiceExternalAvgDuration(aH.sqlClient, query) + result, err := (*aH.reader).GetServiceExternalAvgDuration(context.Background(), query) if aH.handleError(w, err, http.StatusBadRequest) { return } @@ -235,7 +234,7 @@ func (aH *APIHandler) getServiceExternalErrors(w http.ResponseWriter, r *http.Re return } - result, err := druidQuery.GetServiceExternalErrors(aH.sqlClient, query) + result, err := (*aH.reader).GetServiceExternalErrors(context.Background(), query) if aH.handleError(w, err, http.StatusBadRequest) { return } @@ -251,7 +250,7 @@ func (aH *APIHandler) getServiceOverview(w http.ResponseWriter, r *http.Request) return } - result, err := druidQuery.GetServiceOverview(aH.sqlClient, query) + result, err := (*aH.reader).GetServiceOverview(context.Background(), query) if aH.handleError(w, err, http.StatusBadRequest) { return } @@ -267,7 +266,7 @@ func (aH *APIHandler) getServices(w http.ResponseWriter, r *http.Request) { return } - result, err := druidQuery.GetServices(aH.sqlClient, query) + result, err := (*aH.reader).GetServices(context.Background(), query) if aH.handleError(w, err, http.StatusBadRequest) { return } @@ -289,7 +288,7 @@ func (aH *APIHandler) serviceMapDependencies(w http.ResponseWriter, r *http.Requ return } - result, err := druidQuery.GetServiceMapDependencies(aH.sqlClient, query) + result, err := (*aH.reader).GetServiceMapDependencies(context.Background(), query) if aH.handleError(w, err, http.StatusBadRequest) { return } @@ -302,7 +301,7 @@ func (aH *APIHandler) searchTraces(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) traceId := vars["traceId"] - result, err := druidQuery.SearchTraces(aH.client, traceId) + result, err := (*aH.reader).SearchTraces(context.Background(), traceId) if aH.handleError(w, err, http.StatusBadRequest) { return } @@ -310,6 +309,7 @@ func (aH *APIHandler) searchTraces(w http.ResponseWriter, r *http.Request) { aH.writeJSON(w, r, result) } + func (aH *APIHandler) searchSpansAggregates(w http.ResponseWriter, r *http.Request) { query, err := parseSearchSpanAggregatesRequest(r) @@ -317,7 +317,7 @@ func (aH *APIHandler) searchSpansAggregates(w http.ResponseWriter, r *http.Reque return } - result, err := druidQuery.SearchSpansAggregate(aH.client, query) + result, err := (*aH.reader).SearchSpansAggregate(context.Background(), query) if aH.handleError(w, err, http.StatusBadRequest) { return } @@ -332,7 +332,9 @@ func (aH *APIHandler) searchSpans(w http.ResponseWriter, r *http.Request) { return } - result, err := druidQuery.SearchSpans(aH.client, query) + // result, err := druidQuery.SearchSpans(aH.client, query) + result, err := (*aH.reader).SearchSpans(context.Background(), query) + if aH.handleError(w, err, http.StatusBadRequest) { return } @@ -340,20 +342,20 @@ func (aH *APIHandler) searchSpans(w http.ResponseWriter, r *http.Request) { aH.writeJSON(w, r, result) } -func (aH *APIHandler) getApplicationPercentiles(w http.ResponseWriter, r *http.Request) { - // vars := mux.Vars(r) +// func (aH *APIHandler) getApplicationPercentiles(w http.ResponseWriter, r *http.Request) { +// // vars := mux.Vars(r) - query, err := parseApplicationPercentileRequest(r) - if aH.handleError(w, err, http.StatusBadRequest) { - return - } +// query, err := parseApplicationPercentileRequest(r) +// if aH.handleError(w, err, http.StatusBadRequest) { +// return +// } - result, err := druidQuery.GetApplicationPercentiles(aH.client, query) - if aH.handleError(w, err, http.StatusBadRequest) { - return - } - aH.writeJSON(w, r, result) -} +// result, err := (*aH.reader).GetApplicationPercentiles(context.Background(), query) +// if aH.handleError(w, err, http.StatusBadRequest) { +// return +// } +// aH.writeJSON(w, r, result) +// } func (aH *APIHandler) handleError(w http.ResponseWriter, err error, statusCode int) bool { if err == nil { diff --git a/pkg/query-service/app/interface.go b/pkg/query-service/app/interface.go new file mode 100644 index 0000000000..8f00bfef71 --- /dev/null +++ b/pkg/query-service/app/interface.go @@ -0,0 +1,26 @@ +package app + +import ( + "context" + + "go.signoz.io/query-service/model" +) + +type Reader interface { + GetServiceOverview(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceOverviewItem, error) + GetServices(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceItem, error) + // GetApplicationPercentiles(ctx context.Context, query *model.ApplicationPercentileParams) ([]godruid.Timeseries, error) + SearchSpans(ctx context.Context, query *model.SpanSearchParams) (*[]model.SearchSpansResult, error) + GetServiceDBOverview(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceDBOverviewItem, error) + GetServiceExternalAvgDuration(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) + GetServiceExternalErrors(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) + GetServiceExternal(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) + GetTopEndpoints(ctx context.Context, query *model.GetTopEndpointsParams) (*[]model.TopEndpointsItem, error) + GetUsage(ctx context.Context, query *model.GetUsageParams) (*[]model.UsageItem, error) + GetOperations(ctx context.Context, serviceName string) (*[]string, error) + GetTags(ctx context.Context, serviceName string) (*[]model.TagItem, error) + GetServicesList(ctx context.Context) (*[]string, error) + SearchTraces(ctx context.Context, traceID string) (*[]model.SearchSpansResult, error) + GetServiceMapDependencies(ctx context.Context, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) + SearchSpansAggregate(ctx context.Context, queryParams *model.SpanSearchAggregatesParams) ([]model.SpanSearchAggregatesResponseItem, error) +} diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index 2dd1714ffc..a586dc6f8b 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -38,6 +38,8 @@ func parseGetTopEndpointsRequest(r *http.Request) (*model.GetTopEndpointsParams, StartTime: startTime.Format(time.RFC3339Nano), EndTime: endTime.Format(time.RFC3339Nano), ServiceName: serviceName, + Start: startTime, + End: endTime, } return &getTopEndpointsParams, nil @@ -64,12 +66,16 @@ func parseGetUsageRequest(r *http.Request) (*model.GetUsageParams, error) { } serviceName := r.URL.Query().Get("service") + stepHour := stepInt / 3600 getUsageParams := model.GetUsageParams{ StartTime: startTime.Format(time.RFC3339Nano), EndTime: endTime.Format(time.RFC3339Nano), + Start: startTime, + End: endTime, ServiceName: serviceName, - Period: fmt.Sprintf("PT%dH", stepInt/3600), + Period: fmt.Sprintf("PT%dH", stepHour), + StepHour: stepHour, } return &getUsageParams, nil @@ -101,7 +107,9 @@ func parseGetServiceExternalRequest(r *http.Request) (*model.GetServiceOverviewP } getServiceOverviewParams := model.GetServiceOverviewParams{ + Start: startTime, StartTime: startTime.Format(time.RFC3339Nano), + End: endTime, EndTime: endTime.Format(time.RFC3339Nano), ServiceName: serviceName, Period: fmt.Sprintf("PT%dM", stepInt/60), @@ -137,7 +145,9 @@ func parseGetServiceOverviewRequest(r *http.Request) (*model.GetServiceOverviewP } getServiceOverviewParams := model.GetServiceOverviewParams{ + Start: startTime, StartTime: startTime.Format(time.RFC3339Nano), + End: endTime, EndTime: endTime.Format(time.RFC3339Nano), ServiceName: serviceName, Period: fmt.Sprintf("PT%dM", stepInt/60), @@ -160,7 +170,9 @@ func parseGetServicesRequest(r *http.Request) (*model.GetServicesParams, error) } getServicesParams := model.GetServicesParams{ + Start: startTime, StartTime: startTime.Format(time.RFC3339Nano), + End: endTime, EndTime: endTime.Format(time.RFC3339Nano), Period: int(endTime.Unix() - startTime.Unix()), } @@ -283,6 +295,8 @@ func parseSpanSearchRequest(r *http.Request) (*model.SpanSearchParams, error) { // fmt.Println(startTimeStr) params := &model.SpanSearchParams{ Intervals: fmt.Sprintf("%s/%s", startTimeStr, endTimeStr), + Start: startTime, + End: endTime, Limit: 100, Order: "descending", } diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 4abf7b7e5a..99c47f5539 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -1,8 +1,10 @@ package app import ( + "fmt" "net" "net/http" + "os" "time" "github.com/google/uuid" @@ -11,16 +13,16 @@ import ( "github.com/posthog/posthog-go" "github.com/rs/cors" "github.com/soheilhy/cmux" - "go.signoz.io/query-service/druidQuery" - "go.signoz.io/query-service/godruid" + "go.signoz.io/query-service/app/clickhouseReader" + "go.signoz.io/query-service/app/druidReader" "go.signoz.io/query-service/healthcheck" "go.signoz.io/query-service/utils" "go.uber.org/zap" ) type ServerOptions struct { - HTTPHostPort string - DruidClientUrl string + HTTPHostPort string + // DruidClientUrl string } // Server runs HTTP, Mux and a grpc server @@ -28,11 +30,10 @@ type Server struct { // logger *zap.Logger // querySvc *querysvc.QueryService // queryOptions *QueryOptions - serverOptions *ServerOptions // tracer opentracing.Tracer // TODO make part of flags.Service - - conn net.Listener + serverOptions *ServerOptions + conn net.Listener // grpcConn net.Listener httpConn net.Listener // grpcServer *grpc.Server @@ -64,6 +65,11 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { // if err != nil { // return nil, err // } + httpServer, err := createHTTPServer() + + if err != nil { + return nil, err + } return &Server{ // logger: logger, @@ -72,7 +78,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { // tracer: tracer, // grpcServer: grpcServer, serverOptions: serverOptions, - httpServer: createHTTPServer(serverOptions.DruidClientUrl), + httpServer: httpServer, separatePorts: true, // separatePorts: grpcPort != httpPort, unavailableChannel: make(chan healthcheck.Status), @@ -82,22 +88,25 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { var posthogClient posthog.Client var distinctId string -func createHTTPServer(druidClientUrl string) *http.Server { +func createHTTPServer() (*http.Server, error) { posthogClient = posthog.New("H-htDCae7CR3RV57gUzmol6IAKtm5IMCvbcm_fwnL-w") distinctId = uuid.New().String() - client := godruid.Client{ - Url: druidClientUrl, - Debug: true, + var reader Reader + + storage := os.Getenv("STORAGE") + if storage == "druid" { + zap.S().Info("Using Apache Druid as datastore ...") + reader = druidReader.NewReader() + } else if storage == "clickhouse" { + zap.S().Info("Using ClickHouse as datastore ...") + reader = clickhouseReader.NewReader() + } else { + return nil, fmt.Errorf("Storage type: %s is not supported in query service", storage) } - sqlClient := druidQuery.SqlClient{ - Url: druidClientUrl, - Debug: true, - } - - apiHandler := NewAPIHandler(&client, &sqlClient, &posthogClient, distinctId) + apiHandler := NewAPIHandler(&reader, &posthogClient, distinctId) r := NewRouter() r.Use(analyticsMiddleware) @@ -118,7 +127,7 @@ func createHTTPServer(druidClientUrl string) *http.Server { return &http.Server{ Handler: handler, - } + }, nil } func loggingMiddleware(next http.Handler) http.Handler { diff --git a/pkg/query-service/druidQuery/mysql-query.go b/pkg/query-service/druidQuery/mysql-query.go index ff43dac6e1..07fd7ed251 100644 --- a/pkg/query-service/druidQuery/mysql-query.go +++ b/pkg/query-service/druidQuery/mysql-query.go @@ -11,92 +11,6 @@ import ( "go.uber.org/zap" ) -type ServiceItem struct { - ServiceName string `json:"serviceName"` - Percentile99 float32 `json:"p99"` - AvgDuration float32 `json:"avgDuration"` - NumCalls int `json:"numCalls"` - CallRate float32 `json:"callRate"` - NumErrors int `json:"numErrors"` - ErrorRate float32 `json:"errorRate"` - Num4XX int `json:"num4XX"` - FourXXRate float32 `json:"fourXXRate"` -} -type ServiceListErrorItem struct { - ServiceName string `json:"serviceName"` - NumErrors int `json:"numErrors"` - Num4xx int `json:"num4xx"` -} - -type ServiceErrorItem struct { - Time string `json:"time,omitempty"` - Timestamp int64 `json:"timestamp"` - NumErrors int `json:"numErrors"` -} - -type ServiceOverviewItem struct { - Time string `json:"time,omitempty"` - Timestamp int64 `json:"timestamp"` - Percentile50 float32 `json:"p50"` - Percentile95 float32 `json:"p95"` - Percentile99 float32 `json:"p99"` - NumCalls int `json:"numCalls"` - CallRate float32 `json:"callRate"` - NumErrors int `json:"numErrors"` - ErrorRate float32 `json:"errorRate"` -} - -type ServiceExternalItem struct { - Time string `json:"time,omitempty"` - Timestamp int64 `json:"timestamp,omitempty"` - ExternalHttpUrl string `json:"externalHttpUrl,omitempty"` - AvgDuration float32 `json:"avgDuration,omitempty"` - NumCalls int `json:"numCalls,omitempty"` - CallRate float32 `json:"callRate,omitempty"` - NumErrors int `json:"numErrors"` - ErrorRate float32 `json:"errorRate"` -} - -type ServiceDBOverviewItem struct { - Time string `json:"time,omitempty"` - Timestamp int64 `json:"timestamp,omitempty"` - DBSystem string `json:"dbSystem,omitempty"` - AvgDuration float32 `json:"avgDuration,omitempty"` - NumCalls int `json:"numCalls,omitempty"` - CallRate float32 `json:"callRate,omitempty"` -} - -type ServiceMapDependencyItem struct { - SpanId string `json:"spanId,omitempty"` - ParentSpanId string `json:"parentSpanId,omitempty"` - ServiceName string `json:"serviceName,omitempty"` -} - -type UsageItem struct { - Time string `json:"time,omitempty"` - Timestamp int64 `json:"timestamp"` - Count int64 `json:"count"` -} - -type TopEnpointsItem struct { - Percentile50 float32 `json:"p50"` - Percentile90 float32 `json:"p90"` - Percentile99 float32 `json:"p99"` - NumCalls int `json:"numCalls"` - Name string `json:"name"` -} - -type TagItem struct { - TagKeys string `json:"tagKeys"` - TagCount int `json:"tagCount"` -} - -type ServiceMapDependencyResponseItem struct { - Parent string `json:"parent,omitempty"` - Child string `json:"child,omitempty"` - CallCount int `json:"callCount,omitempty"` -} - func GetOperations(client *SqlClient, serviceName string) (*[]string, error) { sqlQuery := fmt.Sprintf(`SELECT DISTINCT(Name) FROM %s WHERE ServiceName='%s' AND __time > CURRENT_TIMESTAMP - INTERVAL '1' DAY`, constants.DruidDatasource, serviceName) @@ -155,7 +69,7 @@ func GetServicesList(client *SqlClient) (*[]string, error) { return &servicesListReponse, nil } -func GetTags(client *SqlClient, serviceName string) (*[]TagItem, error) { +func GetTags(client *SqlClient, serviceName string) (*[]model.TagItem, error) { var sqlQuery string @@ -176,7 +90,7 @@ func GetTags(client *SqlClient, serviceName string) (*[]TagItem, error) { // zap.S().Info(string(response)) - res := new([]TagItem) + res := new([]model.TagItem) err = json.Unmarshal(response, res) if err != nil { zap.S().Error(err) @@ -187,9 +101,9 @@ func GetTags(client *SqlClient, serviceName string) (*[]TagItem, error) { return &tagResponse, nil } -func GetTopEndpoints(client *SqlClient, query *model.GetTopEndpointsParams) (*[]TopEnpointsItem, error) { +func GetTopEndpoints(client *SqlClient, query *model.GetTopEndpointsParams) (*[]model.TopEndpointsItem, error) { - sqlQuery := fmt.Sprintf(`SELECT APPROX_QUANTILE_DS("QuantileDuration", 0.5) as p50, APPROX_QUANTILE_DS("QuantileDuration", 0.9) as p90, APPROX_QUANTILE_DS("QuantileDuration", 0.99) as p99, COUNT(SpanId) as numCalls, Name FROM "%s" WHERE "__time" >= '%s' AND "__time" <= '%s' AND "Kind"='2' and "ServiceName"='%s' GROUP BY Name`, constants.DruidDatasource, query.StartTime, query.EndTime, query.ServiceName) + sqlQuery := fmt.Sprintf(`SELECT APPROX_QUANTILE_DS("QuantileDuration", 0.5) as p50, APPROX_QUANTILE_DS("QuantileDuration", 0.95) as p95, APPROX_QUANTILE_DS("QuantileDuration", 0.99) as p99, COUNT(SpanId) as numCalls, Name FROM "%s" WHERE "__time" >= '%s' AND "__time" <= '%s' AND "Kind"='2' and "ServiceName"='%s' GROUP BY Name`, constants.DruidDatasource, query.StartTime, query.EndTime, query.ServiceName) // zap.S().Debug(sqlQuery) @@ -202,7 +116,7 @@ func GetTopEndpoints(client *SqlClient, query *model.GetTopEndpointsParams) (*[] // zap.S().Info(string(response)) - res := new([]TopEnpointsItem) + res := new([]model.TopEndpointsItem) err = json.Unmarshal(response, res) if err != nil { zap.S().Error(err) @@ -213,7 +127,7 @@ func GetTopEndpoints(client *SqlClient, query *model.GetTopEndpointsParams) (*[] return &topEnpointsResponse, nil } -func GetUsage(client *SqlClient, query *model.GetUsageParams) (*[]UsageItem, error) { +func GetUsage(client *SqlClient, query *model.GetUsageParams) (*[]model.UsageItem, error) { var sqlQuery string @@ -236,7 +150,7 @@ func GetUsage(client *SqlClient, query *model.GetUsageParams) (*[]UsageItem, err // zap.S().Info(string(response)) - res := new([]UsageItem) + res := new([]model.UsageItem) err = json.Unmarshal(response, res) if err != nil { zap.S().Error(err) @@ -253,7 +167,7 @@ func GetUsage(client *SqlClient, query *model.GetUsageParams) (*[]UsageItem, err return &usageResponse, nil } -func GetServiceExternalAvgDuration(client *SqlClient, query *model.GetServiceOverviewParams) (*[]ServiceExternalItem, error) { +func GetServiceExternalAvgDuration(client *SqlClient, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) { sqlQuery := fmt.Sprintf(`SELECT TIME_FLOOR(__time, '%s') as "time", AVG(DurationNano) as "avgDuration" FROM %s WHERE ServiceName='%s' AND Kind='3' AND ExternalHttpUrl != '' AND "__time" >= '%s' AND "__time" <= '%s' GROUP BY TIME_FLOOR(__time, '%s')`, query.Period, constants.DruidDatasource, query.ServiceName, query.StartTime, query.EndTime, query.Period) @@ -270,7 +184,7 @@ func GetServiceExternalAvgDuration(client *SqlClient, query *model.GetServiceOve // responseStr := string(response) // zap.S().Info(responseStr) - res := new([]ServiceExternalItem) + res := new([]model.ServiceExternalItem) err = json.Unmarshal(response, res) if err != nil { zap.S().Error(err) @@ -289,7 +203,7 @@ func GetServiceExternalAvgDuration(client *SqlClient, query *model.GetServiceOve return &servicesExternalResponse, nil } -func GetServiceExternalErrors(client *SqlClient, query *model.GetServiceOverviewParams) (*[]ServiceExternalItem, error) { +func GetServiceExternalErrors(client *SqlClient, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) { sqlQuery := fmt.Sprintf(`SELECT TIME_FLOOR(__time, '%s') as "time", COUNT(SpanId) as "numCalls", ExternalHttpUrl as externalHttpUrl FROM %s WHERE ServiceName='%s' AND Kind='3' AND ExternalHttpUrl != '' AND StatusCode >= 500 AND "__time" >= '%s' AND "__time" <= '%s' GROUP BY TIME_FLOOR(__time, '%s'), ExternalHttpUrl`, query.Period, constants.DruidDatasource, query.ServiceName, query.StartTime, query.EndTime, query.Period) @@ -306,7 +220,7 @@ func GetServiceExternalErrors(client *SqlClient, query *model.GetServiceOverview // responseStr := string(response) // zap.S().Info(responseStr) - res := new([]ServiceExternalItem) + res := new([]model.ServiceExternalItem) err = json.Unmarshal(response, res) if err != nil { zap.S().Error(err) @@ -328,7 +242,7 @@ func GetServiceExternalErrors(client *SqlClient, query *model.GetServiceOverview // responseStr := string(response) // zap.S().Info(responseStr) - resTotal := new([]ServiceExternalItem) + resTotal := new([]model.ServiceExternalItem) err = json.Unmarshal(responseTotal, resTotal) if err != nil { zap.S().Error(err) @@ -361,7 +275,7 @@ func GetServiceExternalErrors(client *SqlClient, query *model.GetServiceOverview return &servicesExternalResponse, nil } -func GetServiceExternal(client *SqlClient, query *model.GetServiceOverviewParams) (*[]ServiceExternalItem, error) { +func GetServiceExternal(client *SqlClient, query *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) { sqlQuery := fmt.Sprintf(`SELECT TIME_FLOOR(__time, '%s') as "time", AVG(DurationNano) as "avgDuration", COUNT(SpanId) as "numCalls", ExternalHttpUrl as externalHttpUrl FROM %s WHERE ServiceName='%s' AND Kind='3' AND ExternalHttpUrl != '' AND "__time" >= '%s' AND "__time" <= '%s' @@ -379,7 +293,7 @@ func GetServiceExternal(client *SqlClient, query *model.GetServiceOverviewParams // responseStr := string(response) // zap.S().Info(responseStr) - res := new([]ServiceExternalItem) + res := new([]model.ServiceExternalItem) err = json.Unmarshal(response, res) if err != nil { zap.S().Error(err) @@ -398,7 +312,7 @@ func GetServiceExternal(client *SqlClient, query *model.GetServiceOverviewParams return &servicesExternalResponse, nil } -func GetServiceDBOverview(client *SqlClient, query *model.GetServiceOverviewParams) (*[]ServiceDBOverviewItem, error) { +func GetServiceDBOverview(client *SqlClient, query *model.GetServiceOverviewParams) (*[]model.ServiceDBOverviewItem, error) { sqlQuery := fmt.Sprintf(`SELECT TIME_FLOOR(__time, '%s') as "time", AVG(DurationNano) as "avgDuration", COUNT(SpanId) as "numCalls", DBSystem as "dbSystem" FROM %s WHERE ServiceName='%s' AND Kind='3' AND DBName IS NOT NULL AND "__time" >= '%s' AND "__time" <= '%s' @@ -416,7 +330,7 @@ func GetServiceDBOverview(client *SqlClient, query *model.GetServiceOverviewPara // responseStr := string(response) // zap.S().Info(responseStr) - res := new([]ServiceDBOverviewItem) + res := new([]model.ServiceDBOverviewItem) err = json.Unmarshal(response, res) if err != nil { zap.S().Error(err) @@ -435,7 +349,7 @@ func GetServiceDBOverview(client *SqlClient, query *model.GetServiceOverviewPara return &servicesDBOverviewResponse, nil } -func GetServiceOverview(client *SqlClient, query *model.GetServiceOverviewParams) (*[]ServiceOverviewItem, error) { +func GetServiceOverview(client *SqlClient, query *model.GetServiceOverviewParams) (*[]model.ServiceOverviewItem, error) { sqlQuery := fmt.Sprintf(`SELECT TIME_FLOOR(__time, '%s') as "time", APPROX_QUANTILE_DS("QuantileDuration", 0.5) as p50, APPROX_QUANTILE_DS("QuantileDuration", 0.95) as p95, APPROX_QUANTILE_DS("QuantileDuration", 0.99) as p99, COUNT("SpanId") as "numCalls" FROM "%s" WHERE "__time" >= '%s' and "__time" <= '%s' and "Kind"='2' and "ServiceName"='%s' GROUP BY TIME_FLOOR(__time, '%s') `, query.Period, constants.DruidDatasource, query.StartTime, query.EndTime, query.ServiceName, query.Period) @@ -451,7 +365,7 @@ func GetServiceOverview(client *SqlClient, query *model.GetServiceOverviewParams // zap.S().Info(string(response)) - res := new([]ServiceOverviewItem) + res := new([]model.ServiceOverviewItem) err = json.Unmarshal(response, res) if err != nil { zap.S().Error(err) @@ -471,7 +385,7 @@ func GetServiceOverview(client *SqlClient, query *model.GetServiceOverviewParams // zap.S().Info(string(response)) - resError := new([]ServiceErrorItem) + resError := new([]model.ServiceErrorItem) err = json.Unmarshal(responseError, resError) if err != nil { zap.S().Error(err) @@ -501,7 +415,7 @@ func GetServiceOverview(client *SqlClient, query *model.GetServiceOverviewParams return &servicesOverviewResponse, nil } -func GetServices(client *SqlClient, query *model.GetServicesParams) (*[]ServiceItem, error) { +func GetServices(client *SqlClient, query *model.GetServicesParams) (*[]model.ServiceItem, error) { sqlQuery := fmt.Sprintf(`SELECT APPROX_QUANTILE_DS("QuantileDuration", 0.99) as "p99", AVG("DurationNano") as "avgDuration", COUNT(SpanId) as numCalls, "ServiceName" as "serviceName" FROM %s WHERE "__time" >= '%s' and "__time" <= '%s' and "Kind"='2' GROUP BY "ServiceName" ORDER BY "p99" DESC`, constants.DruidDatasource, query.StartTime, query.EndTime) @@ -516,7 +430,7 @@ func GetServices(client *SqlClient, query *model.GetServicesParams) (*[]ServiceI // zap.S().Info(string(response)) - res := new([]ServiceItem) + res := new([]model.ServiceItem) err = json.Unmarshal(response, res) if err != nil { zap.S().Error(err) @@ -538,7 +452,7 @@ func GetServices(client *SqlClient, query *model.GetServicesParams) (*[]ServiceI // zap.S().Info(string(response)) - resError := new([]ServiceListErrorItem) + resError := new([]model.ServiceListErrorItem) err = json.Unmarshal(responseError, resError) if err != nil { zap.S().Error(err) @@ -555,7 +469,7 @@ func GetServices(client *SqlClient, query *model.GetServicesParams) (*[]ServiceI ////////////////// Below block gets 4xx of services - sqlQuery = fmt.Sprintf(`SELECT COUNT(SpanId) as numErrors, "ServiceName" as "serviceName" FROM %s WHERE "__time" >= '%s' and "__time" <= '%s' and "Kind"='2' and "StatusCode">=400 and "StatusCode" < 500 GROUP BY "ServiceName"`, constants.DruidDatasource, query.StartTime, query.EndTime) + sqlQuery = fmt.Sprintf(`SELECT COUNT(SpanId) as num4xx, "ServiceName" as "serviceName" FROM %s WHERE "__time" >= '%s' and "__time" <= '%s' and "Kind"='2' and "StatusCode">=400 and "StatusCode" < 500 GROUP BY "ServiceName"`, constants.DruidDatasource, query.StartTime, query.EndTime) response4xx, err := client.Query(sqlQuery, "object") @@ -568,7 +482,7 @@ func GetServices(client *SqlClient, query *model.GetServicesParams) (*[]ServiceI // zap.S().Info(string(response)) - res4xx := new([]ServiceListErrorItem) + res4xx := new([]model.ServiceListErrorItem) err = json.Unmarshal(response4xx, res4xx) if err != nil { zap.S().Error(err) @@ -601,9 +515,9 @@ func GetServices(client *SqlClient, query *model.GetServicesParams) (*[]ServiceI return &servicesResponse, nil } -func GetServiceMapDependencies(client *SqlClient, query *model.GetServicesParams) (*[]ServiceMapDependencyResponseItem, error) { +func GetServiceMapDependencies(client *SqlClient, query *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) { - sqlQuery := fmt.Sprintf(`SELECT SpanId, ParentSpanId, ServiceName FROM %s WHERE "__time" >= '%s' AND "__time" <= '%s' ORDER BY __time DESC LIMIT 100000`, constants.DruidDatasource, query.StartTime, query.EndTime) + sqlQuery := fmt.Sprintf(`SELECT SpanId, ParentSpanId, ServiceName FROM %s WHERE "__time" >= '%s' AND "__time" <= '%s' ORDER BY __time DESC`, constants.DruidDatasource, query.StartTime, query.EndTime) // zap.S().Debug(sqlQuery) @@ -617,7 +531,7 @@ func GetServiceMapDependencies(client *SqlClient, query *model.GetServicesParams // responseStr := string(response) // zap.S().Info(responseStr) - res := new([]ServiceMapDependencyItem) + res := new([]model.ServiceMapDependencyItem) err = json.Unmarshal(response, res) if err != nil { zap.S().Error(err) @@ -626,7 +540,7 @@ func GetServiceMapDependencies(client *SqlClient, query *model.GetServicesParams // resCount := len(*res) // fmt.Println(resCount) - serviceMap := make(map[string]*ServiceMapDependencyResponseItem) + serviceMap := make(map[string]*model.ServiceMapDependencyResponseItem) spanId2ServiceNameMap := make(map[string]string) for i, _ := range *res { @@ -635,7 +549,7 @@ func GetServiceMapDependencies(client *SqlClient, query *model.GetServicesParams for i, _ := range *res { parent2childServiceName := spanId2ServiceNameMap[(*res)[i].ParentSpanId] + "-" + spanId2ServiceNameMap[(*res)[i].SpanId] if _, ok := serviceMap[parent2childServiceName]; !ok { - serviceMap[parent2childServiceName] = &ServiceMapDependencyResponseItem{ + serviceMap[parent2childServiceName] = &model.ServiceMapDependencyResponseItem{ Parent: spanId2ServiceNameMap[(*res)[i].ParentSpanId], Child: spanId2ServiceNameMap[(*res)[i].SpanId], CallCount: 1, @@ -645,7 +559,7 @@ func GetServiceMapDependencies(client *SqlClient, query *model.GetServicesParams } } - retMe := make([]ServiceMapDependencyResponseItem, 0, len(serviceMap)) + retMe := make([]model.ServiceMapDependencyResponseItem, 0, len(serviceMap)) for _, dependency := range serviceMap { if dependency.Parent == "" { continue diff --git a/pkg/query-service/druidQuery/query.go b/pkg/query-service/druidQuery/query.go index 7aabf4cef0..b618b5ce1d 100644 --- a/pkg/query-service/druidQuery/query.go +++ b/pkg/query-service/druidQuery/query.go @@ -27,11 +27,6 @@ type SpanSearchAggregatesDuratonReceivedItem struct { Result DurationItem `json:"result"` } -type SpanSearchAggregatesResponseItem struct { - Timestamp int64 `json:"timestamp"` - Value float32 `json:"value"` -} - func buildFilters(queryParams *model.SpanSearchParams) (*godruid.Filter, error) { var filter *godruid.Filter @@ -181,7 +176,7 @@ func buildFiltersForSpansAggregates(queryParams *model.SpanSearchAggregatesParam } -func SearchTraces(client *godruid.Client, traceId string) ([]godruid.ScanResult, error) { +func SearchTraces(client *godruid.Client, traceId string) (*[]model.SearchSpansResult, error) { filter := godruid.FilterSelector("TraceId", traceId) @@ -206,10 +201,20 @@ func SearchTraces(client *godruid.Client, traceId string) ([]godruid.ScanResult, // fmt.Printf("query.QueryResult:\n%v", query.QueryResult) - return query.QueryResult, nil + var searchSpansResult []model.SearchSpansResult + searchSpansResult = make([]model.SearchSpansResult, len(query.QueryResult)) + + searchSpansResult[0].Columns = make([]string, len(query.QueryResult[0].Columns)) + copy(searchSpansResult[0].Columns, query.QueryResult[0].Columns) + + searchSpansResult[0].Events = make([][]interface{}, len(query.QueryResult[0].Events)) + copy(searchSpansResult[0].Events, query.QueryResult[0].Events) + + return &searchSpansResult, nil + } -func SearchSpansAggregate(client *godruid.Client, queryParams *model.SpanSearchAggregatesParams) ([]SpanSearchAggregatesResponseItem, error) { +func SearchSpansAggregate(client *godruid.Client, queryParams *model.SpanSearchAggregatesParams) ([]model.SpanSearchAggregatesResponseItem, error) { filter, err := buildFiltersForSpansAggregates(queryParams) var needsPostAggregation bool = true @@ -293,7 +298,7 @@ func SearchSpansAggregate(client *godruid.Client, queryParams *model.SpanSearchA return nil, fmt.Errorf("Error in unmarshalling response from druid") } - var response []SpanSearchAggregatesResponseItem + var response []model.SpanSearchAggregatesResponseItem for _, elem := range *receivedResponse { @@ -304,7 +309,7 @@ func SearchSpansAggregate(client *godruid.Client, queryParams *model.SpanSearchA if queryParams.AggregationOption == "rate_per_sec" { value = elem.Result.Value * 1.0 / float32(queryParams.StepSeconds) } - response = append(response, SpanSearchAggregatesResponseItem{ + response = append(response, model.SpanSearchAggregatesResponseItem{ Timestamp: timestamp, Value: value, }) @@ -316,7 +321,7 @@ func SearchSpansAggregate(client *godruid.Client, queryParams *model.SpanSearchA return nil, nil } -func SearchSpans(client *godruid.Client, queryParams *model.SpanSearchParams) ([]godruid.ScanResult, error) { +func SearchSpans(client *godruid.Client, queryParams *model.SpanSearchParams) (*[]model.SearchSpansResult, error) { filter, err := buildFilters(queryParams) @@ -347,7 +352,16 @@ func SearchSpans(client *godruid.Client, queryParams *model.SpanSearchParams) ([ // fmt.Printf("query.QueryResult:\n%v", query.QueryResult) - return query.QueryResult, nil + var searchSpansResult []model.SearchSpansResult + searchSpansResult = make([]model.SearchSpansResult, len(query.QueryResult)) + + searchSpansResult[0].Columns = make([]string, len(query.QueryResult[0].Columns)) + copy(searchSpansResult[0].Columns, query.QueryResult[0].Columns) + + searchSpansResult[0].Events = make([][]interface{}, len(query.QueryResult[0].Events)) + copy(searchSpansResult[0].Events, query.QueryResult[0].Events) + + return &searchSpansResult, nil } func GetApplicationPercentiles(client *godruid.Client, queryParams *model.ApplicationPercentileParams) ([]godruid.Timeseries, error) { diff --git a/pkg/query-service/go.mod b/pkg/query-service/go.mod index 2cf5d0eccf..1f6e1c9859 100644 --- a/pkg/query-service/go.mod +++ b/pkg/query-service/go.mod @@ -3,10 +3,14 @@ module go.signoz.io/query-service go 1.14 require ( + github.com/ClickHouse/clickhouse-go v1.4.5 + github.com/gogo/protobuf v1.2.1 github.com/google/uuid v1.1.1 github.com/gorilla/handlers v1.5.1 github.com/gorilla/mux v1.8.0 github.com/jaegertracing/jaeger v1.21.0 + github.com/jmoiron/sqlx v1.3.4 + github.com/opentracing/opentracing-go v1.1.0 github.com/ory/viper v1.7.5 github.com/posthog/posthog-go v0.0.0-20200525173953-e46dc8e6b89b github.com/rs/cors v1.7.0 diff --git a/pkg/query-service/go.sum b/pkg/query-service/go.sum index 4cce05fccc..e75469dc1e 100644 --- a/pkg/query-service/go.sum +++ b/pkg/query-service/go.sum @@ -2,6 +2,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/ClickHouse/clickhouse-go v1.4.5 h1:FfhyEnv6/BaWldyjgT2k4gDDmeNwJ9C4NbY/MXxJlXk= +github.com/ClickHouse/clickhouse-go v1.4.5/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI= github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/DataDog/zstd v1.4.4/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/HdrHistogram/hdrhistogram-go v0.9.0/go.mod h1:nxrse8/Tzg2tg3DZcZjm6qEclQKK70g0KxO61gFFZD4= @@ -40,6 +42,7 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= +github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwjwegp5jy4= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/bsm/sarama-cluster v2.1.13+incompatible/go.mod h1:r7ao+4tTNXvWm+VRpRJchr2kQhqxgmAp2iEX5W96gMM= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= @@ -50,6 +53,7 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= @@ -166,6 +170,7 @@ github.com/go-openapi/validate v0.19.2/go.mod h1:1tRCw7m3jtI8eNWEEliiAqUIcBztB2K github.com/go-openapi/validate v0.19.3/go.mod h1:90Vh6jjkTn+OT1Eefm0ZixWNFjhtOH7vS9k0lo6zwJo= github.com/go-openapi/validate v0.19.8/go.mod h1:8DJv2CVJQ6kGNpFW6eV9N3JviE1C85nY1c2z52x1Gk4= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gobuffalo/attrs v0.0.0-20190224210810-a9411de4debd/go.mod h1:4duuawTqi2wkkpB4ePgWMaai6/Kc6WEz83bhFwpHzj0= github.com/gobuffalo/depgen v0.0.0-20190329151759-d478694a28d3/go.mod h1:3STtPUQYuzV0gBVOY3vy6CfMm/ljR4pABfrTeHNLHUY= @@ -277,8 +282,12 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/jaegertracing/jaeger v1.21.0 h1:Fgre3vTI5E/cmkXKBXK7ksnzul5b/3gXjA3mQzt0+58= github.com/jaegertracing/jaeger v1.21.0/go.mod h1:PCTGGFohQBPQMR4j333V5lt6If7tj8aWJ+pQNgvZ+wU= +github.com/jaegertracing/jaeger v1.22.0 h1:kFBhBn9XSB8V68DjD3t6qb/IUAJLLtyJ/27caGQOu7E= github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= +github.com/jmoiron/sqlx v1.3.4 h1:wv+0IJZfL5z0uZoUjlpKgHkgaFSYD+r9CfrXjEXsO7w= +github.com/jmoiron/sqlx v1.3.4/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ= github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= @@ -304,6 +313,7 @@ github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= @@ -329,6 +339,8 @@ github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcME github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= +github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= diff --git a/pkg/query-service/main.go b/pkg/query-service/main.go index 5749921770..e261ebf9e6 100644 --- a/pkg/query-service/main.go +++ b/pkg/query-service/main.go @@ -57,14 +57,12 @@ func main() { logger := loggerMgr.Sugar() logger.Debug("START!") - // v := initViper() - serverOptions := &app.ServerOptions{ // HTTPHostPort: v.GetString(app.HTTPHostPort), // DruidClientUrl: v.GetString(app.DruidClientUrl), - HTTPHostPort: constants.HTTPHostPort, - DruidClientUrl: constants.DruidClientUrl, + HTTPHostPort: constants.HTTPHostPort, + // DruidClientUrl: constants.DruidClientUrl, } server, err := app.NewServer(serverOptions) diff --git a/pkg/query-service/model/model.go b/pkg/query-service/model/queryParams.go similarity index 83% rename from pkg/query-service/model/model.go rename to pkg/query-service/model/queryParams.go index f0c4f4d8cc..49a0ed5b28 100644 --- a/pkg/query-service/model/model.go +++ b/pkg/query-service/model/queryParams.go @@ -1,11 +1,16 @@ package model -import "fmt" +import ( + "fmt" + "time" +) type GetTopEndpointsParams struct { StartTime string EndTime string ServiceName string + Start *time.Time + End *time.Time } type GetUsageParams struct { @@ -13,17 +18,24 @@ type GetUsageParams struct { EndTime string ServiceName string Period string + StepHour int + Start *time.Time + End *time.Time } type GetServicesParams struct { StartTime string EndTime string Period int + Start *time.Time + End *time.Time } type GetServiceOverviewParams struct { StartTime string EndTime string + Start *time.Time + End *time.Time ServiceName string Period string StepSeconds int @@ -67,6 +79,8 @@ type SpanSearchParams struct { OperationName string Kind string Intervals string + Start *time.Time + End *time.Time MinDuration string MaxDuration string Limit int64 diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go new file mode 100644 index 0000000000..841a8eb86c --- /dev/null +++ b/pkg/query-service/model/response.go @@ -0,0 +1,169 @@ +package model + +import ( + "encoding/json" + "fmt" + "strconv" + "time" +) + +type ServiceItem struct { + ServiceName string `json:"serviceName" db:"serviceName"` + Percentile99 float32 `json:"p99" db:"p99"` + AvgDuration float32 `json:"avgDuration" db:"avgDuration"` + NumCalls int `json:"numCalls" db:"numCalls"` + CallRate float32 `json:"callRate" db:"callRate"` + NumErrors int `json:"numErrors" db:"numErrors"` + ErrorRate float32 `json:"errorRate" db:"errorRate"` + Num4XX int `json:"num4XX" db:"num4xx"` + FourXXRate float32 `json:"fourXXRate" db:"fourXXRate"` +} + +type ServiceListErrorItem struct { + ServiceName string `json:"serviceName"` + NumErrors int `json:"numErrors"` + Num4xx int `json:"num4xx"` +} + +type ServiceErrorItem struct { + Time string `json:"time,omitempty" db:"time,omitempty"` + Timestamp int64 `json:"timestamp" db:"timestamp"` + NumErrors int `json:"numErrors" db:"numErrors"` +} + +type ServiceOverviewItem struct { + Time string `json:"time,omitempty" db:"time,omitempty"` + Timestamp int64 `json:"timestamp" db:"timestamp"` + Percentile50 float32 `json:"p50" db:"p50"` + Percentile95 float32 `json:"p95" db:"p95"` + Percentile99 float32 `json:"p99" db:"p99"` + NumCalls int `json:"numCalls" db:"numCalls"` + CallRate float32 `json:"callRate" db:"callRate"` + NumErrors int `json:"numErrors" db:"numErrors"` + ErrorRate float32 `json:"errorRate" db:"errorRate"` +} + +type SearchSpansResult struct { + Columns []string `json:"columns"` + Events [][]interface{} `json:"events"` +} + +type TraceResult struct { + Data []interface{} `json:"data" db:"data"` + Total int `json:"total" db:"total"` + Limit int `json:"limit" db:"limit"` + Offset int `json:"offset" db:"offset"` +} +type TraceResultItem struct { + TraceID string + Spans []TraceResultSpan +} +type TraceResultSpan struct { + Timestamp string `db:"timestamp"` + SpanID string `db:"spanID"` + TraceID string `db:"traceID"` + ServiceName string `db:"serviceName"` + Name string `db:"name"` + Kind int32 `db:"kind"` + DurationNano int64 `db:"durationNano"` + TagsKeys []string `db:"tagsKeys"` + TagsValues []string `db:"tagsValues"` +} + +type SearchSpanReponseItem struct { + Timestamp string `db:"timestamp"` + SpanID string `db:"spanID"` + TraceID string `db:"traceID"` + ServiceName string `db:"serviceName"` + Name string `db:"name"` + Kind int32 `db:"kind"` + References string `db:"references,omitempty"` + DurationNano int64 `db:"durationNano"` + TagsKeys []string `db:"tagsKeys"` + TagsValues []string `db:"tagsValues"` +} + +type OtelSpanRef struct { + TraceId string `json:"traceId,omitempty"` + SpanId string `json:"spanId,omitempty"` + RefType string `json:"refType,omitempty"` +} + +func (ref *OtelSpanRef) toString() string { + + retString := fmt.Sprintf(`{TraceId=%s, SpanId=%s, RefType=%s}`, ref.TraceId, ref.SpanId, ref.RefType) + + return retString +} + +func (item *SearchSpanReponseItem) GetValues() []interface{} { + + timeObj, _ := time.Parse(time.RFC3339Nano, item.Timestamp) + references := []OtelSpanRef{} + json.Unmarshal([]byte(item.References), &references) + + referencesStringArray := []string{} + for _, item := range references { + referencesStringArray = append(referencesStringArray, item.toString()) + } + + returnArray := []interface{}{int64(timeObj.UnixNano() / 1000000), item.SpanID, item.TraceID, item.ServiceName, item.Name, strconv.Itoa(int(item.Kind)), strconv.FormatInt(item.DurationNano, 10), item.TagsKeys, item.TagsValues, referencesStringArray} + + return returnArray +} + +type ServiceExternalItem struct { + Time string `json:"time,omitempty" db:"time,omitempty"` + Timestamp int64 `json:"timestamp,omitempty" db:"timestamp,omitempty"` + ExternalHttpUrl string `json:"externalHttpUrl,omitempty" db:"externalHttpUrl,omitempty"` + AvgDuration float32 `json:"avgDuration,omitempty" db:"avgDuration,omitempty"` + NumCalls int `json:"numCalls,omitempty" db:"numCalls,omitempty"` + CallRate float32 `json:"callRate,omitempty" db:"callRate,omitempty"` + NumErrors int `json:"numErrors" db:"numErrors"` + ErrorRate float32 `json:"errorRate" db:"errorRate"` +} + +type ServiceDBOverviewItem struct { + Time string `json:"time,omitempty" db:"time,omitempty"` + Timestamp int64 `json:"timestamp,omitempty" db:"timestamp,omitempty"` + DBSystem string `json:"dbSystem,omitempty" db:"dbSystem,omitempty"` + AvgDuration float32 `json:"avgDuration,omitempty" db:"avgDuration,omitempty"` + NumCalls int `json:"numCalls,omitempty" db:"numCalls,omitempty"` + CallRate float32 `json:"callRate,omitempty" db:"callRate,omitempty"` +} + +type ServiceMapDependencyItem struct { + SpanId string `json:"spanId,omitempty" db:"spanID,omitempty"` + ParentSpanId string `json:"parentSpanId,omitempty" db:"parentSpanID,omitempty"` + ServiceName string `json:"serviceName,omitempty" db:"serviceName,omitempty"` +} + +type UsageItem struct { + Time string `json:"time,omitempty" db:"time,omitempty"` + Timestamp int64 `json:"timestamp" db:"timestamp"` + Count int64 `json:"count" db:"count"` +} + +type TopEndpointsItem struct { + Percentile50 float32 `json:"p50" db:"p50"` + Percentile95 float32 `json:"p95" db:"p95"` + Percentile99 float32 `json:"p99" db:"p99"` + NumCalls int `json:"numCalls" db:"numCalls"` + Name string `json:"name" db:"name"` +} + +type TagItem struct { + TagKeys string `json:"tagKeys" db:"tagKeys"` + TagCount int `json:"tagCount" db:"tagCount"` +} + +type ServiceMapDependencyResponseItem struct { + Parent string `json:"parent,omitempty" db:"parent,omitempty"` + Child string `json:"child,omitempty" db:"child,omitempty"` + CallCount int `json:"callCount,omitempty" db:"callCount,omitempty"` +} + +type SpanSearchAggregatesResponseItem struct { + Timestamp int64 `json:"timestamp"` + Value float32 `json:"value"` +} diff --git a/pkg/query-service/test/clickhouse/clickhouse.go b/pkg/query-service/test/clickhouse/clickhouse.go new file mode 100644 index 0000000000..6dff517a86 --- /dev/null +++ b/pkg/query-service/test/clickhouse/clickhouse.go @@ -0,0 +1,20 @@ +package clickhouse + +type ClickHouseReader struct { + ClickHouseClientUrl string +} + +func connect() string { + return "Connected to ClickHouse" +} + +func NewSpanReader() *ClickHouseReader { + connect() + return &ClickHouseReader{ + ClickHouseClientUrl: "http://localhost:9000", + } +} + +func (chReader *ClickHouseReader) GetServices() string { + return "Hello from ClickHouse" +} diff --git a/pkg/query-service/test/druid/druid.go b/pkg/query-service/test/druid/druid.go new file mode 100644 index 0000000000..252966944a --- /dev/null +++ b/pkg/query-service/test/druid/druid.go @@ -0,0 +1,20 @@ +package druid + +type DruidReader struct { + DruidClientUrl string +} + +func connect() string { + return "Connected to Druid" +} + +func NewSpanReader() *DruidReader { + connect() + return &DruidReader{ + DruidClientUrl: "http://localhost:8888", + } +} + +func (druidReader *DruidReader) GetServices() string { + return "Hello from Druid" +} diff --git a/pkg/query-service/test/test.go b/pkg/query-service/test/test.go new file mode 100644 index 0000000000..755851a374 --- /dev/null +++ b/pkg/query-service/test/test.go @@ -0,0 +1,27 @@ +package main + +import ( + "fmt" + "os" + + "go.signoz.io/query-service/test/clickhouse" + "go.signoz.io/query-service/test/druid" +) + +type StorageReader interface { + GetServices() string +} + +func main() { + storage := os.Getenv("STORAGE") + var client StorageReader + + if storage == "druid" { + client = druid.NewSpanReader() + } else if storage == "clickhouse" { + client = clickhouse.NewSpanReader() + } + + services := client.GetServices() + fmt.Println(services) +} diff --git a/yarn.lock b/yarn.lock new file mode 100644 index 0000000000..fb57ccd13a --- /dev/null +++ b/yarn.lock @@ -0,0 +1,4 @@ +# THIS IS AN AUTOGENERATED FILE. DO NOT EDIT THIS FILE DIRECTLY. +# yarn lockfile v1 + +