Ahsan Barkati 1d28ceb3d7
feat(query-service): Add cold storage support in getTTL API (#922)
* Add cold storage support in getTTL API
2022-04-01 11:22:25 +05:30

2784 lines
96 KiB
Go

package clickhouseReader
import (
"bytes"
"context"
"crypto/md5"
"database/sql"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"time"
sd_config "github.com/prometheus/prometheus/discovery/config"
"github.com/prometheus/prometheus/scrape"
"github.com/pkg/errors"
_ "github.com/ClickHouse/clickhouse-go"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/jmoiron/sqlx"
"github.com/oklog/oklog/pkg/group"
"github.com/prometheus/client_golang/prometheus"
promModel "github.com/prometheus/common/model"
"github.com/prometheus/common/promlog"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/storage/tsdb"
"github.com/prometheus/prometheus/util/stats"
"github.com/prometheus/prometheus/util/strutil"
"go.signoz.io/query-service/constants"
am "go.signoz.io/query-service/integrations/alertManager"
"go.signoz.io/query-service/model"
"go.uber.org/zap"
)
const (
primaryNamespace = "clickhouse"
archiveNamespace = "clickhouse-archive"
signozTraceTableName = "signoz_index"
signozMetricDBName = "signoz_metrics"
signozSampleName = "samples"
signozTSName = "time_series"
)
var (
ErrNoOperationsTable = errors.New("no operations table supplied")
ErrNoIndexTable = errors.New("no index table supplied")
ErrStartTimeRequired = errors.New("start time is required for search queries")
)
// SpanWriter for reading spans from ClickHouse
type ClickHouseReader struct {
db *sqlx.DB
localDB *sqlx.DB
operationsTable string
indexTable string
errorTable string
queryEngine *promql.Engine
remoteStorage *remote.Storage
ruleManager *rules.Manager
promConfig *config.Config
alertManager am.Manager
}
// NewTraceReader returns a TraceReader for the database
func NewReader(localDB *sqlx.DB) *ClickHouseReader {
datasource := os.Getenv("ClickHouseUrl")
options := NewOptions(datasource, primaryNamespace, archiveNamespace)
db, err := initialize(options)
if err != nil {
zap.S().Error(err)
os.Exit(1)
}
alertManager := am.New("")
return &ClickHouseReader{
db: db,
localDB: localDB,
alertManager: alertManager,
operationsTable: options.primary.OperationsTable,
indexTable: options.primary.IndexTable,
errorTable: options.primary.ErrorTable,
}
}
func (r *ClickHouseReader) Start() {
logLevel := promlog.AllowedLevel{}
logLevel.Set("debug")
// allowedFormat := promlog.AllowedFormat{}
// allowedFormat.Set("logfmt")
// promlogConfig := promlog.Config{
// Level: &logLevel,
// Format: &allowedFormat,
// }
logger := promlog.New(logLevel)
startTime := func() (int64, error) {
return int64(promModel.Latest), nil
}
remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), startTime, time.Duration(1*time.Minute))
// conf, err := config.LoadFile(*filename)
// if err != nil {
// zap.S().Error("couldn't load configuration (--config.file=%q): %v", filename, err)
// }
// err = remoteStorage.ApplyConfig(conf)
// if err != nil {
// zap.S().Error("Error in remoteStorage.ApplyConfig: ", err)
// }
cfg := struct {
configFile string
localStoragePath string
notifier notifier.Options
notifierTimeout promModel.Duration
forGracePeriod promModel.Duration
outageTolerance promModel.Duration
resendDelay promModel.Duration
tsdb tsdb.Options
lookbackDelta promModel.Duration
webTimeout promModel.Duration
queryTimeout promModel.Duration
queryConcurrency int
queryMaxSamples int
RemoteFlushDeadline promModel.Duration
prometheusURL string
logLevel promlog.AllowedLevel
}{
notifier: notifier.Options{
Registerer: prometheus.DefaultRegisterer,
},
}
flag.StringVar(&cfg.configFile, "config", "./config/prometheus.yml", "(prometheus config to read metrics)")
flag.Parse()
// fanoutStorage := remoteStorage
fanoutStorage := storage.NewFanout(logger, remoteStorage)
localStorage := remoteStorage
cfg.notifier.QueueCapacity = 10000
cfg.notifierTimeout = promModel.Duration(time.Duration.Seconds(10))
notifier := notifier.NewManager(&cfg.notifier, log.With(logger, "component", "notifier"))
// notifier.ApplyConfig(conf)
ExternalURL, err := computeExternalURL("", "0.0.0.0:3301")
if err != nil {
fmt.Fprintln(os.Stderr, errors.Wrapf(err, "parse external URL %q", ExternalURL.String()))
os.Exit(2)
}
cfg.outageTolerance = promModel.Duration(time.Duration.Hours(1))
cfg.forGracePeriod = promModel.Duration(time.Duration.Minutes(10))
cfg.resendDelay = promModel.Duration(time.Duration.Minutes(1))
ctxScrape, cancelScrape := context.WithCancel(context.Background())
discoveryManagerScrape := discovery.NewManager(ctxScrape, log.With(logger, "component", "discovery manager scrape"), discovery.Name("scrape"))
ctxNotify, cancelNotify := context.WithCancel(context.Background())
discoveryManagerNotify := discovery.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"), discovery.Name("notify"))
scrapeManager := scrape.NewManager(log.With(logger, "component", "scrape manager"), fanoutStorage)
opts := promql.EngineOpts{
Logger: log.With(logger, "component", "query engine"),
Reg: nil,
MaxConcurrent: 20,
MaxSamples: 50000000,
Timeout: time.Duration(2 * time.Minute),
}
queryEngine := promql.NewEngine(opts)
ruleManager := rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
TSDB: localStorage,
QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
NotifyFunc: sendAlerts(notifier, ExternalURL.String()),
Context: context.Background(),
ExternalURL: ExternalURL,
Registerer: prometheus.DefaultRegisterer,
Logger: log.With(logger, "component", "rule manager"),
OutageTolerance: time.Duration(cfg.outageTolerance),
ForGracePeriod: time.Duration(cfg.forGracePeriod),
ResendDelay: time.Duration(cfg.resendDelay),
})
reloaders := []func(cfg *config.Config) error{
remoteStorage.ApplyConfig,
// The Scrape and notifier managers need to reload before the Discovery manager as
// they need to read the most updated config when receiving the new targets list.
notifier.ApplyConfig,
scrapeManager.ApplyConfig,
func(cfg *config.Config) error {
c := make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range cfg.ScrapeConfigs {
c[v.JobName] = v.ServiceDiscoveryConfig
}
return discoveryManagerScrape.ApplyConfig(c)
},
func(cfg *config.Config) error {
c := make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range cfg.AlertingConfig.AlertmanagerConfigs {
// AlertmanagerConfigs doesn't hold an unique identifier so we use the config hash as the identifier.
b, err := json.Marshal(v)
if err != nil {
return err
}
c[fmt.Sprintf("%x", md5.Sum(b))] = v.ServiceDiscoveryConfig
}
return discoveryManagerNotify.ApplyConfig(c)
},
// func(cfg *config.Config) error {
// // Get all rule files matching the configuration oaths.
// var files []string
// for _, pat := range cfg.RuleFiles {
// fs, err := filepath.Glob(pat)
// if err != nil {
// // The only error can be a bad pattern.
// return fmt.Errorf("error retrieving rule files for %s: %s", pat, err)
// }
// files = append(files, fs...)
// }
// return ruleManager.Update(time.Duration(cfg.GlobalConfig.EvaluationInterval), files)
// },
}
// sync.Once is used to make sure we can close the channel at different execution stages(SIGTERM or when the config is loaded).
type closeOnce struct {
C chan struct{}
once sync.Once
Close func()
}
// Wait until the server is ready to handle reloading.
reloadReady := &closeOnce{
C: make(chan struct{}),
}
reloadReady.Close = func() {
reloadReady.once.Do(func() {
close(reloadReady.C)
})
}
var g group.Group
{
// Scrape discovery manager.
g.Add(
func() error {
err := discoveryManagerScrape.Run()
level.Info(logger).Log("msg", "Scrape discovery manager stopped")
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping scrape discovery manager...")
cancelScrape()
},
)
}
{
// Notify discovery manager.
g.Add(
func() error {
err := discoveryManagerNotify.Run()
level.Info(logger).Log("msg", "Notify discovery manager stopped")
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping notify discovery manager...")
cancelNotify()
},
)
}
{
// Scrape manager.
g.Add(
func() error {
// When the scrape manager receives a new targets list
// it needs to read a valid config for each job.
// It depends on the config being in sync with the discovery manager so
// we wait until the config is fully loaded.
<-reloadReady.C
err := scrapeManager.Run(discoveryManagerScrape.SyncCh())
level.Info(logger).Log("msg", "Scrape manager stopped")
return err
},
func(err error) {
// Scrape manager needs to be stopped before closing the local TSDB
// so that it doesn't try to write samples to a closed storage.
level.Info(logger).Log("msg", "Stopping scrape manager...")
scrapeManager.Stop()
},
)
}
{
// Initial configuration loading.
cancel := make(chan struct{})
g.Add(
func() error {
// select {
// case <-dbOpen:
// break
// // In case a shutdown is initiated before the dbOpen is released
// case <-cancel:
// reloadReady.Close()
// return nil
// }
r.promConfig, err = reloadConfig(cfg.configFile, logger, reloaders...)
if err != nil {
return fmt.Errorf("error loading config from %q: %s", cfg.configFile, err)
}
reloadReady.Close()
rules, apiErrorObj := r.GetRulesFromDB()
if apiErrorObj != nil {
zap.S().Errorf("Not able to read rules from DB")
}
for _, rule := range *rules {
apiErrorObj = r.LoadRule(rule)
if apiErrorObj != nil {
zap.S().Errorf("Not able to load rule with id=%d loaded from DB", rule.Id, rule.Data)
}
}
channels, apiErrorObj := r.GetChannels()
if apiErrorObj != nil {
zap.S().Errorf("Not able to read channels from DB")
}
for _, channel := range *channels {
apiErrorObj = r.LoadChannel(&channel)
if apiErrorObj != nil {
zap.S().Errorf("Not able to load channel with id=%d loaded from DB", channel.Id, channel.Data)
}
}
<-cancel
return nil
},
func(err error) {
close(cancel)
},
)
}
{
// Rule manager.
// TODO(krasi) refactor ruleManager.Run() to be blocking to avoid using an extra blocking channel.
cancel := make(chan struct{})
g.Add(
func() error {
<-reloadReady.C
ruleManager.Run()
<-cancel
return nil
},
func(err error) {
ruleManager.Stop()
close(cancel)
},
)
}
{
// Notifier.
// Calling notifier.Stop() before ruleManager.Stop() will cause a panic if the ruleManager isn't running,
// so keep this interrupt after the ruleManager.Stop().
g.Add(
func() error {
// When the notifier manager receives a new targets list
// it needs to read a valid config for each job.
// It depends on the config being in sync with the discovery manager
// so we wait until the config is fully loaded.
<-reloadReady.C
notifier.Run(discoveryManagerNotify.SyncCh())
level.Info(logger).Log("msg", "Notifier manager stopped")
return nil
},
func(err error) {
notifier.Stop()
},
)
}
r.queryEngine = queryEngine
r.remoteStorage = remoteStorage
r.ruleManager = ruleManager
if err := g.Run(); err != nil {
level.Error(logger).Log("err", err)
os.Exit(1)
}
}
func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config) error) (promConfig *config.Config, err error) {
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)
conf, err := config.LoadFile(filename)
if err != nil {
return nil, fmt.Errorf("couldn't load configuration (--config.file=%q): %v", filename, err)
}
failed := false
for _, rl := range rls {
if err := rl(conf); err != nil {
level.Error(logger).Log("msg", "Failed to apply configuration", "err", err)
failed = true
}
}
if failed {
return nil, fmt.Errorf("one or more errors occurred while applying the new configuration (--config.file=%q)", filename)
}
level.Info(logger).Log("msg", "Completed loading of configuration file", "filename", filename)
return conf, nil
}
func startsOrEndsWithQuote(s string) bool {
return strings.HasPrefix(s, "\"") || strings.HasPrefix(s, "'") ||
strings.HasSuffix(s, "\"") || strings.HasSuffix(s, "'")
}
// computeExternalURL computes a sanitized external URL from a raw input. It infers unset
// URL parts from the OS and the given listen address.
func computeExternalURL(u, listenAddr string) (*url.URL, error) {
if u == "" {
hostname, err := os.Hostname()
if err != nil {
return nil, err
}
_, port, err := net.SplitHostPort(listenAddr)
if err != nil {
return nil, err
}
u = fmt.Sprintf("http://%s:%s/", hostname, port)
}
if startsOrEndsWithQuote(u) {
return nil, fmt.Errorf("URL must not begin or end with quotes")
}
eu, err := url.Parse(u)
if err != nil {
return nil, err
}
ppref := strings.TrimRight(eu.Path, "/")
if ppref != "" && !strings.HasPrefix(ppref, "/") {
ppref = "/" + ppref
}
eu.Path = ppref
return eu, nil
}
// sendAlerts implements the rules.NotifyFunc for a Notifier.
func sendAlerts(n *notifier.Manager, externalURL string) rules.NotifyFunc {
return func(ctx context.Context, expr string, alerts ...*rules.Alert) {
var res []*notifier.Alert
for _, alert := range alerts {
a := &notifier.Alert{
StartsAt: alert.FiredAt,
Labels: alert.Labels,
Annotations: alert.Annotations,
GeneratorURL: externalURL + strutil.TableLinkForExpression(expr),
}
if !alert.ResolvedAt.IsZero() {
a.EndsAt = alert.ResolvedAt
} else {
a.EndsAt = alert.ValidUntil
}
res = append(res, a)
}
if len(alerts) > 0 {
n.Send(res...)
}
}
}
func initialize(options *Options) (*sqlx.DB, error) {
db, err := connect(options.getPrimary())
if err != nil {
return nil, fmt.Errorf("error connecting to primary db: %v", err)
}
return db, nil
}
func connect(cfg *namespaceConfig) (*sqlx.DB, error) {
if cfg.Encoding != EncodingJSON && cfg.Encoding != EncodingProto {
return nil, fmt.Errorf("unknown encoding %q, supported: %q, %q", cfg.Encoding, EncodingJSON, EncodingProto)
}
return cfg.Connector(cfg)
}
type byAlertStateAndNameSorter struct {
alerts []*AlertingRuleWithGroup
}
func (s byAlertStateAndNameSorter) Len() int {
return len(s.alerts)
}
func (s byAlertStateAndNameSorter) Less(i, j int) bool {
return s.alerts[i].State() > s.alerts[j].State() ||
(s.alerts[i].State() == s.alerts[j].State() &&
s.alerts[i].Name() < s.alerts[j].Name())
}
func (s byAlertStateAndNameSorter) Swap(i, j int) {
s.alerts[i], s.alerts[j] = s.alerts[j], s.alerts[i]
}
type AlertingRuleWithGroup struct {
rules.AlertingRule
Id int
}
func (r *ClickHouseReader) GetRulesFromDB() (*[]model.RuleResponseItem, *model.ApiError) {
rules := []model.RuleResponseItem{}
query := fmt.Sprintf("SELECT id, updated_at, data FROM rules")
err := r.localDB.Select(&rules, query)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return &rules, nil
}
func (r *ClickHouseReader) GetRule(id string) (*model.RuleResponseItem, *model.ApiError) {
idInt, _ := strconv.Atoi(id)
rule := &model.RuleResponseItem{}
query := fmt.Sprintf("SELECT id, updated_at, data FROM rules WHERE id=%d", idInt)
err := r.localDB.Get(rule, query)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return rule, nil
}
func (r *ClickHouseReader) ListRulesFromProm() (*model.AlertDiscovery, *model.ApiError) {
groups := r.ruleManager.RuleGroups()
alertingRulesWithGroupObjects := []*AlertingRuleWithGroup{}
for _, group := range groups {
groupNameParts := strings.Split(group.Name(), "-groupname")
if len(groupNameParts) < 2 {
continue
}
id, _ := strconv.Atoi(groupNameParts[0])
for _, rule := range group.Rules() {
if alertingRule, ok := rule.(*rules.AlertingRule); ok {
alertingRulesWithGroupObject := AlertingRuleWithGroup{
*alertingRule,
id,
}
alertingRulesWithGroupObjects = append(alertingRulesWithGroupObjects, &alertingRulesWithGroupObject)
}
}
}
// alertingRules := r.ruleManager.AlertingRules()
alertsSorter := byAlertStateAndNameSorter{alerts: alertingRulesWithGroupObjects}
sort.Sort(alertsSorter)
alerts := []*model.AlertingRuleResponse{}
for _, alertingRule := range alertsSorter.alerts {
alertingRuleResponseObject := &model.AlertingRuleResponse{
Labels: alertingRule.Labels(),
// Annotations: alertingRule.Annotations(),
Name: alertingRule.Name(),
Id: alertingRule.Id,
}
if len(alertingRule.ActiveAlerts()) == 0 {
alertingRuleResponseObject.State = rules.StateInactive.String()
} else {
alertingRuleResponseObject.State = (*(alertingRule.ActiveAlerts()[0])).State.String()
}
alerts = append(
alerts,
alertingRuleResponseObject,
)
}
res := &model.AlertDiscovery{Alerts: alerts}
return res, nil
}
func (r *ClickHouseReader) LoadRule(rule model.RuleResponseItem) *model.ApiError {
groupName := fmt.Sprintf("%d-groupname", rule.Id)
err := r.ruleManager.AddGroup(time.Duration(r.promConfig.GlobalConfig.EvaluationInterval), rule.Data, groupName)
if err != nil {
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return nil
}
func (r *ClickHouseReader) LoadChannel(channel *model.ChannelItem) *model.ApiError {
receiver := &am.Receiver{}
if err := json.Unmarshal([]byte(channel.Data), receiver); err != nil { // Parse []byte to go struct pointer
return &model.ApiError{Typ: model.ErrorBadData, Err: err}
}
response, err := http.Post(constants.GetAlertManagerApiPrefix()+"v1/receivers", "application/json", bytes.NewBuffer([]byte(channel.Data)))
if err != nil {
zap.S().Errorf("Error in getting response of API call to alertmanager/v1/receivers\n", err)
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
if response.StatusCode > 299 {
responseData, _ := ioutil.ReadAll(response.Body)
err := fmt.Errorf("Error in getting 2xx response in API call to alertmanager/v1/receivers\n", response.Status, string(responseData))
zap.S().Error(err)
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return nil
}
func (r *ClickHouseReader) GetChannel(id string) (*model.ChannelItem, *model.ApiError) {
idInt, _ := strconv.Atoi(id)
channel := model.ChannelItem{}
query := fmt.Sprintf("SELECT id, created_at, updated_at, name, type, data data FROM notification_channels WHERE id=%d", idInt)
err := r.localDB.Get(&channel, query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return &channel, nil
}
func (r *ClickHouseReader) DeleteChannel(id string) *model.ApiError {
idInt, _ := strconv.Atoi(id)
channelToDelete, apiErrorObj := r.GetChannel(id)
if apiErrorObj != nil {
return apiErrorObj
}
tx, err := r.localDB.Begin()
if err != nil {
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
{
stmt, err := tx.Prepare(`DELETE FROM notification_channels WHERE id=$1;`)
if err != nil {
zap.S().Errorf("Error in preparing statement for INSERT to notification_channels\n", err)
tx.Rollback()
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
defer stmt.Close()
if _, err := stmt.Exec(idInt); err != nil {
zap.S().Errorf("Error in Executing prepared statement for INSERT to notification_channels\n", err)
tx.Rollback() // return an error too, we may want to wrap them
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
}
apiError := r.alertManager.DeleteRoute(channelToDelete.Name)
if apiError != nil {
tx.Rollback()
return apiError
}
err = tx.Commit()
if err != nil {
zap.S().Errorf("Error in commiting transaction for DELETE command to notification_channels\n", err)
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return nil
}
func (r *ClickHouseReader) GetChannels() (*[]model.ChannelItem, *model.ApiError) {
channels := []model.ChannelItem{}
query := fmt.Sprintf("SELECT id, created_at, updated_at, name, type, data data FROM notification_channels")
err := r.localDB.Select(&channels, query)
// zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return &channels, nil
}
func getChannelType(receiver *am.Receiver) string {
if receiver.EmailConfigs != nil {
return "email"
}
if receiver.OpsGenieConfigs != nil {
return "opsgenie"
}
if receiver.PagerdutyConfigs != nil {
return "pagerduty"
}
if receiver.PushoverConfigs != nil {
return "pushover"
}
if receiver.SNSConfigs != nil {
return "sns"
}
if receiver.SlackConfigs != nil {
return "slack"
}
if receiver.VictorOpsConfigs != nil {
return "victorops"
}
if receiver.WebhookConfigs != nil {
return "webhook"
}
if receiver.WechatConfigs != nil {
return "wechat"
}
return ""
}
func (r *ClickHouseReader) EditChannel(receiver *am.Receiver, id string) (*am.Receiver, *model.ApiError) {
idInt, _ := strconv.Atoi(id)
channel, apiErrObj := r.GetChannel(id)
if apiErrObj != nil {
return nil, apiErrObj
}
if channel.Name != receiver.Name {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("channel name cannot be changed")}
}
tx, err := r.localDB.Begin()
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
channel_type := getChannelType(receiver)
receiverString, _ := json.Marshal(receiver)
{
stmt, err := tx.Prepare(`UPDATE notification_channels SET updated_at=$1, type=$2, data=$3 WHERE id=$4;`)
if err != nil {
zap.S().Errorf("Error in preparing statement for UPDATE to notification_channels\n", err)
tx.Rollback()
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
defer stmt.Close()
if _, err := stmt.Exec(time.Now(), channel_type, string(receiverString), idInt); err != nil {
zap.S().Errorf("Error in Executing prepared statement for UPDATE to notification_channels\n", err)
tx.Rollback() // return an error too, we may want to wrap them
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
}
apiError := r.alertManager.EditRoute(receiver)
if apiError != nil {
tx.Rollback()
return nil, apiError
}
err = tx.Commit()
if err != nil {
zap.S().Errorf("Error in commiting transaction for INSERT to notification_channels\n", err)
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return receiver, nil
}
func (r *ClickHouseReader) CreateChannel(receiver *am.Receiver) (*am.Receiver, *model.ApiError) {
tx, err := r.localDB.Begin()
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
channel_type := getChannelType(receiver)
receiverString, _ := json.Marshal(receiver)
// todo: check if the channel name already exists, raise an error if so
{
stmt, err := tx.Prepare(`INSERT INTO notification_channels (created_at, updated_at, name, type, data) VALUES($1,$2,$3,$4,$5);`)
if err != nil {
zap.S().Errorf("Error in preparing statement for INSERT to notification_channels\n", err)
tx.Rollback()
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
defer stmt.Close()
if _, err := stmt.Exec(time.Now(), time.Now(), receiver.Name, channel_type, string(receiverString)); err != nil {
zap.S().Errorf("Error in Executing prepared statement for INSERT to notification_channels\n", err)
tx.Rollback() // return an error too, we may want to wrap them
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
}
apiError := r.alertManager.AddRoute(receiver)
if apiError != nil {
tx.Rollback()
return nil, apiError
}
err = tx.Commit()
if err != nil {
zap.S().Errorf("Error in commiting transaction for INSERT to notification_channels\n", err)
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return receiver, nil
}
func (r *ClickHouseReader) CreateRule(rule string) *model.ApiError {
tx, err := r.localDB.Begin()
if err != nil {
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
var lastInsertId int64
{
stmt, err := tx.Prepare(`INSERT into rules (updated_at, data) VALUES($1,$2);`)
if err != nil {
zap.S().Errorf("Error in preparing statement for INSERT to rules\n", err)
tx.Rollback()
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
defer stmt.Close()
result, err := stmt.Exec(time.Now(), rule)
if err != nil {
zap.S().Errorf("Error in Executing prepared statement for INSERT to rules\n", err)
tx.Rollback() // return an error too, we may want to wrap them
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
lastInsertId, _ = result.LastInsertId()
groupName := fmt.Sprintf("%d-groupname", lastInsertId)
err = r.ruleManager.AddGroup(time.Duration(r.promConfig.GlobalConfig.EvaluationInterval), rule, groupName)
if err != nil {
tx.Rollback()
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
}
err = tx.Commit()
if err != nil {
zap.S().Errorf("Error in commiting transaction for INSERT to rules\n", err)
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return nil
}
func (r *ClickHouseReader) EditRule(rule string, id string) *model.ApiError {
idInt, _ := strconv.Atoi(id)
tx, err := r.localDB.Begin()
if err != nil {
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
{
stmt, err := tx.Prepare(`UPDATE rules SET updated_at=$1, data=$2 WHERE id=$3;`)
if err != nil {
zap.S().Errorf("Error in preparing statement for UPDATE to rules\n", err)
tx.Rollback()
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
defer stmt.Close()
if _, err := stmt.Exec(time.Now(), rule, idInt); err != nil {
zap.S().Errorf("Error in Executing prepared statement for UPDATE to rules\n", err)
tx.Rollback() // return an error too, we may want to wrap them
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
groupName := fmt.Sprintf("%d-groupname", idInt)
err = r.ruleManager.EditGroup(time.Duration(r.promConfig.GlobalConfig.EvaluationInterval), rule, groupName)
if err != nil {
tx.Rollback()
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
}
err = tx.Commit()
if err != nil {
zap.S().Errorf("Error in commiting transaction for UPDATE to rules\n", err)
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return nil
}
func (r *ClickHouseReader) DeleteRule(id string) *model.ApiError {
idInt, _ := strconv.Atoi(id)
tx, err := r.localDB.Begin()
if err != nil {
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
{
stmt, err := tx.Prepare(`DELETE FROM rules WHERE id=$1;`)
if err != nil {
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
defer stmt.Close()
if _, err := stmt.Exec(idInt); err != nil {
zap.S().Errorf("Error in Executing prepared statement for DELETE to rules\n", err)
tx.Rollback() // return an error too, we may want to wrap them
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
groupName := fmt.Sprintf("%d-groupname", idInt)
rule := "" // dummy rule to pass to function
// err = r.ruleManager.UpdateGroupWithAction(time.Duration(r.promConfig.GlobalConfig.EvaluationInterval), rule, groupName, "delete")
err = r.ruleManager.DeleteGroup(time.Duration(r.promConfig.GlobalConfig.EvaluationInterval), rule, groupName)
if err != nil {
tx.Rollback()
zap.S().Errorf("Error in deleting rule from rulemanager...\n", err)
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
}
err = tx.Commit()
if err != nil {
zap.S().Errorf("Error in commiting transaction for deleting rules\n", err)
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return nil
}
func (r *ClickHouseReader) GetInstantQueryMetricsResult(ctx context.Context, queryParams *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError) {
qry, err := r.queryEngine.NewInstantQuery(r.remoteStorage, queryParams.Query, queryParams.Time)
if err != nil {
return nil, nil, &model.ApiError{model.ErrorBadData, err}
}
res := qry.Exec(ctx)
// Optional stats field in response if parameter "stats" is not empty.
var qs *stats.QueryStats
if queryParams.Stats != "" {
qs = stats.NewQueryStats(qry.Stats())
}
qry.Close()
return res, qs, nil
}
func (r *ClickHouseReader) GetQueryRangeResult(ctx context.Context, query *model.QueryRangeParams) (*promql.Result, *stats.QueryStats, *model.ApiError) {
qry, err := r.queryEngine.NewRangeQuery(r.remoteStorage, query.Query, query.Start, query.End, query.Step)
if err != nil {
return nil, nil, &model.ApiError{model.ErrorBadData, err}
}
res := qry.Exec(ctx)
// Optional stats field in response if parameter "stats" is not empty.
var qs *stats.QueryStats
if query.Stats != "" {
qs = stats.NewQueryStats(qry.Stats())
}
qry.Close()
return res, qs, nil
}
func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceItem, error) {
if r.indexTable == "" {
return nil, ErrNoIndexTable
}
serviceItems := []model.ServiceItem{}
query := fmt.Sprintf("SELECT serviceName, quantile(0.99)(durationNano) as p99, avg(durationNano) as avgDuration, count(*) as numCalls FROM %s WHERE timestamp>='%s' AND timestamp<='%s' AND kind='2' GROUP BY serviceName ORDER BY p99 DESC", r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
err := r.db.Select(&serviceItems, query)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, fmt.Errorf("Error in processing sql query")
}
////////////////// Below block gets 5xx of services
serviceErrorItems := []model.ServiceItem{}
query = fmt.Sprintf("SELECT serviceName, count(*) as numErrors FROM %s WHERE timestamp>='%s' AND timestamp<='%s' AND kind='2' AND (statusCode>=500 OR statusCode=2) GROUP BY serviceName", r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
err = r.db.Select(&serviceErrorItems, query)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, fmt.Errorf("Error in processing sql query")
}
m5xx := make(map[string]int)
for j, _ := range serviceErrorItems {
m5xx[serviceErrorItems[j].ServiceName] = serviceErrorItems[j].NumErrors
}
///////////////////////////////////////////
////////////////// Below block gets 4xx of services
service4xxItems := []model.ServiceItem{}
query = fmt.Sprintf("SELECT serviceName, count(*) as num4xx FROM %s WHERE timestamp>='%s' AND timestamp<='%s' AND kind='2' AND statusCode>=400 AND statusCode<500 GROUP BY serviceName", r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
err = r.db.Select(&service4xxItems, query)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, fmt.Errorf("Error in processing sql query")
}
m4xx := make(map[string]int)
for j, _ := range service4xxItems {
m5xx[service4xxItems[j].ServiceName] = service4xxItems[j].Num4XX
}
for i, _ := range serviceItems {
if val, ok := m5xx[serviceItems[i].ServiceName]; ok {
serviceItems[i].NumErrors = val
}
if val, ok := m4xx[serviceItems[i].ServiceName]; ok {
serviceItems[i].Num4XX = val
}
serviceItems[i].CallRate = float32(serviceItems[i].NumCalls) / float32(queryParams.Period)
serviceItems[i].FourXXRate = float32(serviceItems[i].Num4XX) / float32(queryParams.Period)
serviceItems[i].ErrorRate = float32(serviceItems[i].NumErrors) / float32(queryParams.Period)
}
return &serviceItems, nil
}
func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceOverviewItem, error) {
serviceOverviewItems := []model.ServiceOverviewItem{}
query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, quantile(0.99)(durationNano) as p99, quantile(0.95)(durationNano) as p95,quantile(0.50)(durationNano) as p50, count(*) as numCalls FROM %s WHERE timestamp>='%s' AND timestamp<='%s' AND kind='2' AND serviceName='%s' GROUP BY time ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10), queryParams.ServiceName)
err := r.db.Select(&serviceOverviewItems, query)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, fmt.Errorf("Error in processing sql query")
}
serviceErrorItems := []model.ServiceErrorItem{}
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, count(*) as numErrors FROM %s WHERE timestamp>='%s' AND timestamp<='%s' AND kind='2' AND serviceName='%s' AND (statusCode>=500 OR statusCode=2) GROUP BY time ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10), queryParams.ServiceName)
err = r.db.Select(&serviceErrorItems, query)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, fmt.Errorf("Error in processing sql query")
}
m := make(map[int64]int)
for j, _ := range serviceErrorItems {
timeObj, _ := time.Parse(time.RFC3339Nano, serviceErrorItems[j].Time)
m[int64(timeObj.UnixNano())] = serviceErrorItems[j].NumErrors
}
for i, _ := range serviceOverviewItems {
timeObj, _ := time.Parse(time.RFC3339Nano, serviceOverviewItems[i].Time)
serviceOverviewItems[i].Timestamp = int64(timeObj.UnixNano())
serviceOverviewItems[i].Time = ""
if val, ok := m[serviceOverviewItems[i].Timestamp]; ok {
serviceOverviewItems[i].NumErrors = val
}
serviceOverviewItems[i].ErrorRate = float32(serviceOverviewItems[i].NumErrors) * 100 / float32(serviceOverviewItems[i].NumCalls)
serviceOverviewItems[i].CallRate = float32(serviceOverviewItems[i].NumCalls) / float32(queryParams.StepSeconds)
}
return &serviceOverviewItems, nil
}
func (r *ClickHouseReader) SearchSpans(ctx context.Context, queryParams *model.SpanSearchParams) (*[]model.SearchSpansResult, error) {
query := fmt.Sprintf("SELECT timestamp, spanID, traceID, serviceName, name, kind, durationNano, tagsKeys, tagsValues FROM %s WHERE timestamp >= ? AND timestamp <= ?", r.indexTable)
args := []interface{}{strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)}
if len(queryParams.ServiceName) != 0 {
query = query + " AND serviceName = ?"
args = append(args, queryParams.ServiceName)
}
if len(queryParams.OperationName) != 0 {
query = query + " AND name = ?"
args = append(args, queryParams.OperationName)
}
if len(queryParams.Kind) != 0 {
query = query + " AND kind = ?"
args = append(args, queryParams.Kind)
}
if len(queryParams.MinDuration) != 0 {
query = query + " AND durationNano >= ?"
args = append(args, queryParams.MinDuration)
}
if len(queryParams.MaxDuration) != 0 {
query = query + " AND durationNano <= ?"
args = append(args, queryParams.MaxDuration)
}
for _, item := range queryParams.Tags {
if item.Key == "error" && item.Value == "true" {
query = query + " AND ( has(tags, 'error:true') OR statusCode>=500 OR statusCode=2)"
continue
}
if item.Operator == "equals" {
query = query + " AND has(tags, ?)"
args = append(args, fmt.Sprintf("%s:%s", item.Key, item.Value))
} else if item.Operator == "contains" {
query = query + " AND tagsValues[indexOf(tagsKeys, ?)] ILIKE ?"
args = append(args, item.Key)
args = append(args, fmt.Sprintf("%%%s%%", item.Value))
} else if item.Operator == "regex" {
query = query + " AND match(tagsValues[indexOf(tagsKeys, ?)], ?)"
args = append(args, item.Key)
args = append(args, item.Value)
} else if item.Operator == "isnotnull" {
query = query + " AND has(tagsKeys, ?)"
args = append(args, item.Key)
} else {
return nil, fmt.Errorf("Tag Operator %s not supported", item.Operator)
}
}
query = query + " ORDER BY timestamp DESC LIMIT 100"
var searchScanReponses []model.SearchSpanReponseItem
err := r.db.Select(&searchScanReponses, query, args...)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, fmt.Errorf("Error in processing sql query")
}
searchSpansResult := []model.SearchSpansResult{
model.SearchSpansResult{
Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues"},
Events: make([][]interface{}, len(searchScanReponses)),
},
}
for i, item := range searchScanReponses {
spanEvents := item.GetValues()
searchSpansResult[0].Events[i] = spanEvents
}
return &searchSpansResult, nil
}
func buildFilterArrayQuery(ctx context.Context, excludeMap map[string]struct{}, params []string, filter string, query *string, args []interface{}) []interface{} {
for i, e := range params {
if i == 0 && i == len(params)-1 {
if _, ok := excludeMap[filter]; ok {
*query += fmt.Sprintf(" AND NOT (%s=?)", filter)
} else {
*query += fmt.Sprintf(" AND (%s=?)", filter)
}
} else if i == 0 && i != len(params)-1 {
if _, ok := excludeMap[filter]; ok {
*query += fmt.Sprintf(" AND NOT (%s=?", filter)
} else {
*query += fmt.Sprintf(" AND (%s=?", filter)
}
} else if i != 0 && i == len(params)-1 {
*query += fmt.Sprintf(" OR %s=?)", filter)
} else {
*query += fmt.Sprintf(" OR %s=?", filter)
}
args = append(args, e)
}
return args
}
func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *model.SpanFilterParams) (*model.SpanFiltersResponse, *model.ApiError) {
var query string
excludeMap := make(map[string]struct{})
for _, e := range queryParams.Exclude {
if e == constants.OperationRequest {
excludeMap[constants.OperationDB] = struct{}{}
continue
}
excludeMap[e] = struct{}{}
}
args := []interface{}{strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)}
if len(queryParams.ServiceName) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args)
}
if len(queryParams.HttpRoute) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args)
}
if len(queryParams.HttpCode) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpCode, constants.HttpCode, &query, args)
}
if len(queryParams.HttpHost) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args)
}
if len(queryParams.HttpMethod) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args)
}
if len(queryParams.HttpUrl) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args)
}
if len(queryParams.Component) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Component, constants.Component, &query, args)
}
if len(queryParams.Operation) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args)
}
if len(queryParams.MinDuration) != 0 {
query = query + " AND durationNano >= ?"
args = append(args, queryParams.MinDuration)
}
if len(queryParams.MaxDuration) != 0 {
query = query + " AND durationNano <= ?"
args = append(args, queryParams.MaxDuration)
}
query = getStatusFilters(query, queryParams.Status, excludeMap)
traceFilterReponse := model.SpanFiltersResponse{
Status: map[string]int{},
Duration: map[string]int{},
ServiceName: map[string]int{},
Operation: map[string]int{},
HttpCode: map[string]int{},
HttpMethod: map[string]int{},
HttpUrl: map[string]int{},
HttpRoute: map[string]int{},
HttpHost: map[string]int{},
Component: map[string]int{},
}
for _, e := range queryParams.GetFilters {
switch e {
case "serviceName":
finalQuery := fmt.Sprintf("SELECT serviceName, count() as count FROM %s WHERE timestamp >= ? AND timestamp <= ?", r.indexTable)
finalQuery += query
finalQuery += " GROUP BY serviceName"
var dBResponse []model.DBResponseServiceName
err := r.db.Select(&dBResponse, finalQuery, args...)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query", err)}
}
for _, service := range dBResponse {
if service.ServiceName != "" {
traceFilterReponse.ServiceName[service.ServiceName] = service.Count
}
}
case "httpCode":
finalQuery := fmt.Sprintf("SELECT httpCode, count() as count FROM %s WHERE timestamp >= ? AND timestamp <= ?", r.indexTable)
finalQuery += query
finalQuery += " GROUP BY httpCode"
var dBResponse []model.DBResponseHttpCode
fmt.Println(finalQuery)
err := r.db.Select(&dBResponse, finalQuery, args...)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query", err)}
}
for _, service := range dBResponse {
if service.HttpCode != "" {
traceFilterReponse.HttpCode[service.HttpCode] = service.Count
}
}
case "httpRoute":
finalQuery := fmt.Sprintf("SELECT httpRoute, count() as count FROM %s WHERE timestamp >= ? AND timestamp <= ?", r.indexTable)
finalQuery += query
finalQuery += " GROUP BY httpRoute"
var dBResponse []model.DBResponseHttpRoute
err := r.db.Select(&dBResponse, finalQuery, args...)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query", err)}
}
for _, service := range dBResponse {
if service.HttpRoute != "" {
traceFilterReponse.HttpRoute[service.HttpRoute] = service.Count
}
}
case "httpUrl":
finalQuery := fmt.Sprintf("SELECT httpUrl, count() as count FROM %s WHERE timestamp >= ? AND timestamp <= ?", r.indexTable)
finalQuery += query
finalQuery += " GROUP BY httpUrl"
var dBResponse []model.DBResponseHttpUrl
err := r.db.Select(&dBResponse, finalQuery, args...)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query", err)}
}
for _, service := range dBResponse {
if service.HttpUrl != "" {
traceFilterReponse.HttpUrl[service.HttpUrl] = service.Count
}
}
case "httpMethod":
finalQuery := fmt.Sprintf("SELECT httpMethod, count() as count FROM %s WHERE timestamp >= ? AND timestamp <= ?", r.indexTable)
finalQuery += query
finalQuery += " GROUP BY httpMethod"
var dBResponse []model.DBResponseHttpMethod
err := r.db.Select(&dBResponse, finalQuery, args...)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query", err)}
}
for _, service := range dBResponse {
if service.HttpMethod != "" {
traceFilterReponse.HttpMethod[service.HttpMethod] = service.Count
}
}
case "httpHost":
finalQuery := fmt.Sprintf("SELECT httpHost, count() as count FROM %s WHERE timestamp >= ? AND timestamp <= ?", r.indexTable)
finalQuery += query
finalQuery += " GROUP BY httpHost"
var dBResponse []model.DBResponseHttpHost
err := r.db.Select(&dBResponse, finalQuery, args...)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query", err)}
}
for _, service := range dBResponse {
if service.HttpHost != "" {
traceFilterReponse.HttpHost[service.HttpHost] = service.Count
}
}
case "operation":
finalQuery := fmt.Sprintf("SELECT name, count() as count FROM %s WHERE timestamp >= ? AND timestamp <= ?", r.indexTable)
finalQuery += query
finalQuery += " GROUP BY name"
var dBResponse []model.DBResponseOperation
err := r.db.Select(&dBResponse, finalQuery, args...)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query", err)}
}
for _, service := range dBResponse {
if service.Operation != "" {
traceFilterReponse.Operation[service.Operation] = service.Count
}
}
case "component":
finalQuery := fmt.Sprintf("SELECT component, count() as count FROM %s WHERE timestamp >= ? AND timestamp <= ?", r.indexTable)
finalQuery += query
finalQuery += " GROUP BY component"
var dBResponse []model.DBResponseComponent
err := r.db.Select(&dBResponse, finalQuery, args...)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query", err)}
}
for _, service := range dBResponse {
if service.Component.String != "" {
traceFilterReponse.Component[service.Component.String] = service.Count
}
}
case "status":
finalQuery := fmt.Sprintf("SELECT COUNT(*) as numErrors FROM %s WHERE timestamp >= ? AND timestamp <= ? AND hasError = 1", r.indexTable)
finalQuery += query
var dBResponse []model.DBResponseErrors
err := r.db.Select(&dBResponse, finalQuery, args...)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query", err)}
}
finalQuery2 := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s WHERE timestamp >= ? AND timestamp <= ?", r.indexTable)
finalQuery2 += query
var dBResponse2 []model.DBResponseTotal
err = r.db.Select(&dBResponse2, finalQuery2, args...)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query", err)}
}
traceFilterReponse.Status = map[string]int{"ok": dBResponse2[0].NumTotal - dBResponse[0].NumErrors, "error": dBResponse[0].NumErrors}
case "duration":
finalQuery := fmt.Sprintf("SELECT min(durationNano), max(durationNano) FROM %s WHERE timestamp >= ? AND timestamp <= ?", r.indexTable)
finalQuery += query
var dBResponse []model.DBResponseMinMaxDuration
err := r.db.Select(&dBResponse, finalQuery, args...)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query", err)}
}
for _, service := range dBResponse {
traceFilterReponse.Duration["minDuration"] = service.MinDuration
traceFilterReponse.Duration["maxDuration"] = service.MaxDuration
}
default:
return nil, &model.ApiError{model.ErrorBadData, fmt.Errorf("filter type: %s not supported", e)}
}
}
return &traceFilterReponse, nil
}
func getStatusFilters(query string, statusParams []string, excludeMap map[string]struct{}) string {
// status can only be two and if both are selected than they are equivalent to none selected
if _, ok := excludeMap["status"]; ok {
if len(statusParams) == 1 {
if statusParams[0] == "error" {
query += " AND hasError = 0"
} else if statusParams[0] == "ok" {
query += " AND hasError = 1"
}
}
} else if len(statusParams) == 1 {
if statusParams[0] == "error" {
query += " AND hasError = 1"
} else if statusParams[0] == "ok" {
query += " AND hasError = 0"
}
}
return query
}
func (r *ClickHouseReader) GetFilteredSpans(ctx context.Context, queryParams *model.GetFilteredSpansParams) (*model.GetFilterSpansResponse, *model.ApiError) {
baseQuery := fmt.Sprintf("SELECT timestamp, spanID, traceID, serviceName, name, durationNano, httpCode, httpMethod FROM %s WHERE timestamp >= ? AND timestamp <= ?", r.indexTable)
excludeMap := make(map[string]struct{})
for _, e := range queryParams.Exclude {
if e == constants.OperationRequest {
excludeMap[constants.OperationDB] = struct{}{}
continue
}
excludeMap[e] = struct{}{}
}
var query string
args := []interface{}{strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)}
if len(queryParams.ServiceName) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args)
}
if len(queryParams.HttpRoute) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args)
}
if len(queryParams.HttpCode) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpCode, constants.HttpCode, &query, args)
}
if len(queryParams.HttpHost) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args)
}
if len(queryParams.HttpMethod) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args)
}
if len(queryParams.HttpUrl) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args)
}
if len(queryParams.Component) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Component, constants.Component, &query, args)
}
if len(queryParams.Operation) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args)
}
if len(queryParams.MinDuration) != 0 {
query = query + " AND durationNano >= ?"
args = append(args, queryParams.MinDuration)
}
if len(queryParams.MaxDuration) != 0 {
query = query + " AND durationNano <= ?"
args = append(args, queryParams.MaxDuration)
}
query = getStatusFilters(query, queryParams.Status, excludeMap)
if len(queryParams.Kind) != 0 {
query = query + " AND kind = ?"
args = append(args, queryParams.Kind)
}
for _, item := range queryParams.Tags {
if item.Operator == "in" {
for i, value := range item.Values {
if i == 0 && i == len(item.Values)-1 {
query += " AND has(tags, ?)"
} else if i == 0 && i != len(item.Values)-1 {
query += " AND (has(tags, ?)"
} else if i != 0 && i == len(item.Values)-1 {
query += " OR has(tags, ?))"
} else {
query += " OR has(tags, ?)"
}
args = append(args, fmt.Sprintf("%s:%s", item.Key, value))
}
} else if item.Operator == "not in" {
for i, value := range item.Values {
if i == 0 && i == len(item.Values)-1 {
query += " AND NOT has(tags, ?)"
} else if i == 0 && i != len(item.Values)-1 {
query += " AND NOT (has(tags, ?)"
} else if i != 0 && i == len(item.Values)-1 {
query += " OR has(tags, ?))"
} else {
query += " OR has(tags, ?)"
}
args = append(args, fmt.Sprintf("%s:%s", item.Key, value))
}
} else if item.Operator == "regex" {
if len(item.Values) != 1 {
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Regex tag operator should only have one value")}
}
query = query + " AND match(tagsValues[indexOf(tagsKeys, ?)], ?)"
args = append(args, item.Key)
args = append(args, item.Values[0])
} else if item.Operator == "isnotnull" {
for range item.Values {
query = query + " AND has(tagsKeys, ?)"
args = append(args, item.Key)
}
} else {
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Tag Operator %s not supported", item.Operator)}
}
}
var totalSpans []model.DBResponseTotal
totalSpansQuery := fmt.Sprintf(`SELECT count() as numTotal FROM %s WHERE timestamp >= ? AND timestamp <= ?`, r.indexTable)
totalSpansQuery += query
err := r.db.Select(&totalSpans, totalSpansQuery, args...)
zap.S().Info(totalSpansQuery)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query")}
}
if len(queryParams.Order) != 0 {
if queryParams.Order == "descending" {
query = query + " ORDER BY timestamp DESC"
}
if queryParams.Order == "ascending" {
query = query + " ORDER BY timestamp ASC"
}
}
if queryParams.Limit > 0 {
query = query + " LIMIT ?"
args = append(args, queryParams.Limit)
}
if queryParams.Offset > 0 {
// due to bug in SQLx driver, using %d temporarily
query = query + fmt.Sprintf(" OFFSET %d", queryParams.Offset)
// args = append(args, queryParams.Offset)
}
var getFilterSpansResponseItems []model.GetFilterSpansResponseItem
baseQuery += query
err = r.db.Select(&getFilterSpansResponseItems, baseQuery, args...)
zap.S().Info(baseQuery)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query")}
}
getFilterSpansResponse := model.GetFilterSpansResponse{
Spans: getFilterSpansResponseItems,
TotalSpans: totalSpans[0].NumTotal,
}
return &getFilterSpansResponse, nil
}
func (r *ClickHouseReader) GetTagFilters(ctx context.Context, queryParams *model.TagFilterParams) (*[]model.TagFilters, *model.ApiError) {
excludeMap := make(map[string]struct{})
for _, e := range queryParams.Exclude {
if e == constants.OperationRequest {
excludeMap[constants.OperationDB] = struct{}{}
continue
}
excludeMap[e] = struct{}{}
}
var query string
args := []interface{}{strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)}
if len(queryParams.ServiceName) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args)
}
if len(queryParams.HttpRoute) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args)
}
if len(queryParams.HttpCode) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpCode, constants.HttpCode, &query, args)
}
if len(queryParams.HttpHost) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args)
}
if len(queryParams.HttpMethod) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args)
}
if len(queryParams.HttpUrl) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args)
}
if len(queryParams.Component) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Component, constants.Component, &query, args)
}
if len(queryParams.Operation) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args)
}
if len(queryParams.MinDuration) != 0 {
query = query + " AND durationNano >= ?"
args = append(args, queryParams.MinDuration)
}
if len(queryParams.MaxDuration) != 0 {
query = query + " AND durationNano <= ?"
args = append(args, queryParams.MaxDuration)
}
query = getStatusFilters(query, queryParams.Status, excludeMap)
tagFilters := []model.TagFilters{}
finalQuery := fmt.Sprintf(`SELECT DISTINCT arrayJoin(tagsKeys) as tagKeys FROM %s WHERE timestamp >= ? AND timestamp <= ?`, r.indexTable)
finalQuery += query
fmt.Println(finalQuery)
err := r.db.Select(&tagFilters, finalQuery, args...)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query")}
}
tagFilters = excludeTags(ctx, tagFilters)
return &tagFilters, nil
}
func excludeTags(ctx context.Context, tags []model.TagFilters) []model.TagFilters {
excludedTagsMap := map[string]bool{
"http.code": true,
"http.route": true,
"http.method": true,
"http.url": true,
"http.status_code": true,
"http.host": true,
"messaging.system": true,
"messaging.operation": true,
"component": true,
"error": true,
}
var newTags []model.TagFilters
for _, tag := range tags {
_, ok := excludedTagsMap[tag.TagKeys]
if !ok {
newTags = append(newTags, tag)
}
}
return newTags
}
func (r *ClickHouseReader) GetTagValues(ctx context.Context, queryParams *model.TagFilterParams) (*[]model.TagValues, *model.ApiError) {
excludeMap := make(map[string]struct{})
for _, e := range queryParams.Exclude {
if e == constants.OperationRequest {
excludeMap[constants.OperationDB] = struct{}{}
continue
}
excludeMap[e] = struct{}{}
}
var query string
args := []interface{}{queryParams.TagKey, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)}
if len(queryParams.ServiceName) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args)
}
if len(queryParams.HttpRoute) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args)
}
if len(queryParams.HttpCode) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpCode, constants.HttpCode, &query, args)
}
if len(queryParams.HttpHost) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args)
}
if len(queryParams.HttpMethod) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args)
}
if len(queryParams.HttpUrl) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args)
}
if len(queryParams.Component) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Component, constants.Component, &query, args)
}
if len(queryParams.Operation) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args)
}
if len(queryParams.MinDuration) != 0 {
query = query + " AND durationNano >= ?"
args = append(args, queryParams.MinDuration)
}
if len(queryParams.MaxDuration) != 0 {
query = query + " AND durationNano <= ?"
args = append(args, queryParams.MaxDuration)
}
query = getStatusFilters(query, queryParams.Status, excludeMap)
tagValues := []model.TagValues{}
finalQuery := fmt.Sprintf(`SELECT tagMap[?] as tagValues FROM %s WHERE timestamp >= ? AND timestamp <= ?`, r.indexTable)
finalQuery += query
fmt.Println(finalQuery)
finalQuery += "GROUP BY tagMap[?]"
args = append(args, queryParams.TagKey)
err := r.db.Select(&tagValues, finalQuery, args...)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query")}
}
cleanedTagValues := []model.TagValues{}
for _, e := range tagValues {
if e.TagValues != "" {
cleanedTagValues = append(cleanedTagValues, e)
}
}
return &cleanedTagValues, nil
}
func (r *ClickHouseReader) GetServiceDBOverview(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceDBOverviewItem, error) {
var serviceDBOverviewItems []model.ServiceDBOverviewItem
query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, avg(durationNano) as avgDuration, count(1) as numCalls, dbSystem FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' AND kind='3' AND dbName IS NOT NULL GROUP BY time, dbSystem ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
err := r.db.Select(&serviceDBOverviewItems, query)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, fmt.Errorf("Error in processing sql query")
}
for i, _ := range serviceDBOverviewItems {
timeObj, _ := time.Parse(time.RFC3339Nano, serviceDBOverviewItems[i].Time)
serviceDBOverviewItems[i].Timestamp = int64(timeObj.UnixNano())
serviceDBOverviewItems[i].Time = ""
serviceDBOverviewItems[i].CallRate = float32(serviceDBOverviewItems[i].NumCalls) / float32(queryParams.StepSeconds)
}
if serviceDBOverviewItems == nil {
serviceDBOverviewItems = []model.ServiceDBOverviewItem{}
}
return &serviceDBOverviewItems, nil
}
func (r *ClickHouseReader) GetServiceExternalAvgDuration(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) {
var serviceExternalItems []model.ServiceExternalItem
query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, avg(durationNano) as avgDuration FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' AND kind='3' AND externalHttpUrl IS NOT NULL GROUP BY time ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
err := r.db.Select(&serviceExternalItems, query)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, fmt.Errorf("Error in processing sql query")
}
for i, _ := range serviceExternalItems {
timeObj, _ := time.Parse(time.RFC3339Nano, serviceExternalItems[i].Time)
serviceExternalItems[i].Timestamp = int64(timeObj.UnixNano())
serviceExternalItems[i].Time = ""
serviceExternalItems[i].CallRate = float32(serviceExternalItems[i].NumCalls) / float32(queryParams.StepSeconds)
}
if serviceExternalItems == nil {
serviceExternalItems = []model.ServiceExternalItem{}
}
return &serviceExternalItems, nil
}
func (r *ClickHouseReader) GetServiceExternalErrors(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) {
var serviceExternalErrorItems []model.ServiceExternalItem
query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, avg(durationNano) as avgDuration, count(1) as numCalls, externalHttpUrl FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' AND kind='3' AND externalHttpUrl IS NOT NULL AND (statusCode >= 500 OR statusCode=2) GROUP BY time, externalHttpUrl ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
err := r.db.Select(&serviceExternalErrorItems, query)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, fmt.Errorf("Error in processing sql query")
}
var serviceExternalTotalItems []model.ServiceExternalItem
queryTotal := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, avg(durationNano) as avgDuration, count(1) as numCalls, externalHttpUrl FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' AND kind='3' AND externalHttpUrl IS NOT NULL GROUP BY time, externalHttpUrl ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
errTotal := r.db.Select(&serviceExternalTotalItems, queryTotal)
if errTotal != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, fmt.Errorf("Error in processing sql query")
}
m := make(map[string]int)
for j, _ := range serviceExternalErrorItems {
timeObj, _ := time.Parse(time.RFC3339Nano, serviceExternalErrorItems[j].Time)
m[strconv.FormatInt(timeObj.UnixNano(), 10)+"-"+serviceExternalErrorItems[j].ExternalHttpUrl] = serviceExternalErrorItems[j].NumCalls
}
for i, _ := range serviceExternalTotalItems {
timeObj, _ := time.Parse(time.RFC3339Nano, serviceExternalTotalItems[i].Time)
serviceExternalTotalItems[i].Timestamp = int64(timeObj.UnixNano())
serviceExternalTotalItems[i].Time = ""
// serviceExternalTotalItems[i].CallRate = float32(serviceExternalTotalItems[i].NumCalls) / float32(queryParams.StepSeconds)
if val, ok := m[strconv.FormatInt(serviceExternalTotalItems[i].Timestamp, 10)+"-"+serviceExternalTotalItems[i].ExternalHttpUrl]; ok {
serviceExternalTotalItems[i].NumErrors = val
serviceExternalTotalItems[i].ErrorRate = float32(serviceExternalTotalItems[i].NumErrors) * 100 / float32(serviceExternalTotalItems[i].NumCalls)
}
serviceExternalTotalItems[i].CallRate = 0
serviceExternalTotalItems[i].NumCalls = 0
}
if serviceExternalTotalItems == nil {
serviceExternalTotalItems = []model.ServiceExternalItem{}
}
return &serviceExternalTotalItems, nil
}
func (r *ClickHouseReader) GetServiceExternal(ctx context.Context, queryParams *model.GetServiceOverviewParams) (*[]model.ServiceExternalItem, error) {
var serviceExternalItems []model.ServiceExternalItem
query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %s minute) as time, avg(durationNano) as avgDuration, count(1) as numCalls, externalHttpUrl FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' AND kind='3' AND externalHttpUrl IS NOT NULL GROUP BY time, externalHttpUrl ORDER BY time DESC", strconv.Itoa(int(queryParams.StepSeconds/60)), r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
err := r.db.Select(&serviceExternalItems, query)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, fmt.Errorf("Error in processing sql query")
}
for i, _ := range serviceExternalItems {
timeObj, _ := time.Parse(time.RFC3339Nano, serviceExternalItems[i].Time)
serviceExternalItems[i].Timestamp = int64(timeObj.UnixNano())
serviceExternalItems[i].Time = ""
serviceExternalItems[i].CallRate = float32(serviceExternalItems[i].NumCalls) / float32(queryParams.StepSeconds)
}
if serviceExternalItems == nil {
serviceExternalItems = []model.ServiceExternalItem{}
}
return &serviceExternalItems, nil
}
func (r *ClickHouseReader) GetTopEndpoints(ctx context.Context, queryParams *model.GetTopEndpointsParams) (*[]model.TopEndpointsItem, error) {
var topEndpointsItems []model.TopEndpointsItem
query := fmt.Sprintf("SELECT quantile(0.5)(durationNano) as p50, quantile(0.95)(durationNano) as p95, quantile(0.99)(durationNano) as p99, COUNT(1) as numCalls, name FROM %s WHERE timestamp >= '%s' AND timestamp <= '%s' AND kind='2' and serviceName='%s' GROUP BY name", r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10), queryParams.ServiceName)
err := r.db.Select(&topEndpointsItems, query)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, fmt.Errorf("Error in processing sql query")
}
if topEndpointsItems == nil {
topEndpointsItems = []model.TopEndpointsItem{}
}
return &topEndpointsItems, nil
}
func (r *ClickHouseReader) GetUsage(ctx context.Context, queryParams *model.GetUsageParams) (*[]model.UsageItem, error) {
var usageItems []model.UsageItem
var query string
if len(queryParams.ServiceName) != 0 {
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d HOUR) as time, count(1) as count FROM %s WHERE serviceName='%s' AND timestamp>='%s' AND timestamp<='%s' GROUP BY time ORDER BY time ASC", queryParams.StepHour, r.indexTable, queryParams.ServiceName, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
} else {
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d HOUR) as time, count(1) as count FROM %s WHERE timestamp>='%s' AND timestamp<='%s' GROUP BY time ORDER BY time ASC", queryParams.StepHour, r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
}
err := r.db.Select(&usageItems, query)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, fmt.Errorf("Error in processing sql query")
}
for i, _ := range usageItems {
timeObj, _ := time.Parse(time.RFC3339Nano, usageItems[i].Time)
usageItems[i].Timestamp = int64(timeObj.UnixNano())
usageItems[i].Time = ""
}
if usageItems == nil {
usageItems = []model.UsageItem{}
}
return &usageItems, nil
}
func (r *ClickHouseReader) GetServicesList(ctx context.Context) (*[]string, error) {
services := []string{}
query := fmt.Sprintf(`SELECT DISTINCT serviceName FROM %s WHERE toDate(timestamp) > now() - INTERVAL 1 DAY`, r.indexTable)
err := r.db.Select(&services, query)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, fmt.Errorf("Error in processing sql query")
}
return &services, nil
}
func (r *ClickHouseReader) GetTags(ctx context.Context, serviceName string) (*[]model.TagItem, error) {
tagItems := []model.TagItem{}
query := fmt.Sprintf(`SELECT DISTINCT arrayJoin(tagsKeys) as tagKeys FROM %s WHERE serviceName=? AND toDate(timestamp) > now() - INTERVAL 1 DAY`, r.indexTable)
err := r.db.Select(&tagItems, query, serviceName)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, fmt.Errorf("Error in processing sql query")
}
return &tagItems, nil
}
func (r *ClickHouseReader) GetOperations(ctx context.Context, serviceName string) (*[]string, error) {
operations := []string{}
query := fmt.Sprintf(`SELECT DISTINCT(name) FROM %s WHERE serviceName=? AND toDate(timestamp) > now() - INTERVAL 1 DAY`, r.indexTable)
err := r.db.Select(&operations, query, serviceName)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, fmt.Errorf("Error in processing sql query")
}
return &operations, nil
}
func (r *ClickHouseReader) SearchTraces(ctx context.Context, traceId string) (*[]model.SearchSpansResult, error) {
var searchScanReponses []model.SearchSpanReponseItem
query := fmt.Sprintf("SELECT timestamp, spanID, traceID, serviceName, name, kind, durationNano, tagsKeys, tagsValues, references, events, hasError FROM %s WHERE traceID=?", r.indexTable)
err := r.db.Select(&searchScanReponses, query, traceId)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, fmt.Errorf("Error in processing sql query")
}
searchSpansResult := []model.SearchSpansResult{
model.SearchSpansResult{
Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References", "Events", "HasError"},
Events: make([][]interface{}, len(searchScanReponses)),
},
}
for i, item := range searchScanReponses {
spanEvents := item.GetValues()
searchSpansResult[0].Events[i] = spanEvents
}
return &searchSpansResult, nil
}
func (r *ClickHouseReader) GetServiceMapDependencies(ctx context.Context, queryParams *model.GetServicesParams) (*[]model.ServiceMapDependencyResponseItem, error) {
serviceMapDependencyItems := []model.ServiceMapDependencyItem{}
query := fmt.Sprintf(`SELECT spanID, parentSpanID, serviceName FROM %s WHERE timestamp>='%s' AND timestamp<='%s'`, r.indexTable, strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10))
err := r.db.Select(&serviceMapDependencyItems, query)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, fmt.Errorf("Error in processing sql query")
}
serviceMap := make(map[string]*model.ServiceMapDependencyResponseItem)
spanId2ServiceNameMap := make(map[string]string)
for i, _ := range serviceMapDependencyItems {
spanId2ServiceNameMap[serviceMapDependencyItems[i].SpanId] = serviceMapDependencyItems[i].ServiceName
}
for i, _ := range serviceMapDependencyItems {
parent2childServiceName := spanId2ServiceNameMap[serviceMapDependencyItems[i].ParentSpanId] + "-" + spanId2ServiceNameMap[serviceMapDependencyItems[i].SpanId]
if _, ok := serviceMap[parent2childServiceName]; !ok {
serviceMap[parent2childServiceName] = &model.ServiceMapDependencyResponseItem{
Parent: spanId2ServiceNameMap[serviceMapDependencyItems[i].ParentSpanId],
Child: spanId2ServiceNameMap[serviceMapDependencyItems[i].SpanId],
CallCount: 1,
}
} else {
serviceMap[parent2childServiceName].CallCount++
}
}
retMe := make([]model.ServiceMapDependencyResponseItem, 0, len(serviceMap))
for _, dependency := range serviceMap {
if dependency.Parent == "" {
continue
}
retMe = append(retMe, *dependency)
}
return &retMe, nil
}
func (r *ClickHouseReader) SearchSpansAggregate(ctx context.Context, queryParams *model.SpanSearchAggregatesParams) ([]model.SpanSearchAggregatesResponseItem, error) {
spanSearchAggregatesResponseItems := []model.SpanSearchAggregatesResponseItem{}
aggregation_query := ""
if queryParams.Dimension == "duration" {
switch queryParams.AggregationOption {
case "p50":
aggregation_query = " quantile(0.50)(durationNano) as value "
break
case "p95":
aggregation_query = " quantile(0.95)(durationNano) as value "
break
case "p99":
aggregation_query = " quantile(0.99)(durationNano) as value "
break
}
} else if queryParams.Dimension == "calls" {
aggregation_query = " count(*) as value "
}
query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable)
args := []interface{}{strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)}
if len(queryParams.ServiceName) != 0 {
query = query + " AND serviceName = ?"
args = append(args, queryParams.ServiceName)
}
if len(queryParams.OperationName) != 0 {
query = query + " AND name = ?"
args = append(args, queryParams.OperationName)
}
if len(queryParams.Kind) != 0 {
query = query + " AND kind = ?"
args = append(args, queryParams.Kind)
}
if len(queryParams.MinDuration) != 0 {
query = query + " AND durationNano >= ?"
args = append(args, queryParams.MinDuration)
}
if len(queryParams.MaxDuration) != 0 {
query = query + " AND durationNano <= ?"
args = append(args, queryParams.MaxDuration)
}
for _, item := range queryParams.Tags {
if item.Key == "error" && item.Value == "true" {
query = query + " AND ( has(tags, 'error:true') OR statusCode>=500 OR statusCode=2)"
continue
}
if item.Operator == "equals" {
query = query + " AND has(tags, ?)"
args = append(args, fmt.Sprintf("%s:%s", item.Key, item.Value))
} else if item.Operator == "contains" {
query = query + " AND tagsValues[indexOf(tagsKeys, ?)] ILIKE ?"
args = append(args, item.Key)
args = append(args, fmt.Sprintf("%%%s%%", item.Value))
} else if item.Operator == "regex" {
query = query + " AND match(tagsValues[indexOf(tagsKeys, ?)], ?)"
args = append(args, item.Key)
args = append(args, item.Value)
} else if item.Operator == "isnotnull" {
query = query + " AND has(tagsKeys, ?)"
args = append(args, item.Key)
} else {
return nil, fmt.Errorf("Tag Operator %s not supported", item.Operator)
}
}
query = query + " GROUP BY time ORDER BY time"
err := r.db.Select(&spanSearchAggregatesResponseItems, query, args...)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, fmt.Errorf("Error in processing sql query")
}
for i, _ := range spanSearchAggregatesResponseItems {
timeObj, _ := time.Parse(time.RFC3339Nano, spanSearchAggregatesResponseItems[i].Time)
spanSearchAggregatesResponseItems[i].Timestamp = int64(timeObj.UnixNano())
spanSearchAggregatesResponseItems[i].Time = ""
if queryParams.AggregationOption == "rate_per_sec" {
spanSearchAggregatesResponseItems[i].Value = float32(spanSearchAggregatesResponseItems[i].Value) / float32(queryParams.StepSeconds)
}
}
return spanSearchAggregatesResponseItems, nil
}
func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, queryParams *model.GetFilteredSpanAggregatesParams) (*model.GetFilteredSpansAggregatesResponse, *model.ApiError) {
excludeMap := make(map[string]struct{})
for _, e := range queryParams.Exclude {
if e == constants.OperationRequest {
excludeMap[constants.OperationDB] = struct{}{}
continue
}
excludeMap[e] = struct{}{}
}
SpanAggregatesDBResponseItems := []model.SpanAggregatesDBResponseItem{}
aggregation_query := ""
if queryParams.Dimension == "duration" {
switch queryParams.AggregationOption {
case "p50":
aggregation_query = " quantile(0.50)(durationNano) as value "
case "p95":
aggregation_query = " quantile(0.95)(durationNano) as value "
case "p90":
aggregation_query = " quantile(0.90)(durationNano) as value "
case "p99":
aggregation_query = " quantile(0.99)(durationNano) as value "
case "max":
aggregation_query = " max(durationNano) as value "
case "min":
aggregation_query = " min(durationNano) as value "
case "avg":
aggregation_query = " avg(durationNano) as value "
case "sum":
aggregation_query = " sum(durationNano) as value "
default:
return nil, &model.ApiError{model.ErrorBadData, fmt.Errorf("Aggregate type: %s not supported", queryParams.AggregationOption)}
}
} else if queryParams.Dimension == "calls" {
aggregation_query = " count(*) as value "
}
args := []interface{}{strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)}
var query string
if queryParams.GroupBy != "" {
switch queryParams.GroupBy {
case "serviceName":
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, serviceName as groupBy, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable)
case "httpCode":
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpCode as groupBy, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable)
case "httpMethod":
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpMethod as groupBy, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable)
case "httpUrl":
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpUrl as groupBy, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable)
case "httpRoute":
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpRoute as groupBy, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable)
case "httpHost":
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, httpHost as groupBy, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable)
case "dbName":
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, dbName as groupBy, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable)
case "dbOperation":
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, dbOperation as groupBy, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable)
case "operation":
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, name as groupBy, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable)
case "msgSystem":
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, msgSystem as groupBy, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable)
case "msgOperation":
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, msgOperation as groupBy, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable)
case "dbSystem":
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, dbSystem as groupBy, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable)
case "component":
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, component as groupBy, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable)
default:
return nil, &model.ApiError{model.ErrorBadData, fmt.Errorf("groupBy type: %s not supported", queryParams.GroupBy)}
}
} else {
query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable)
}
if len(queryParams.ServiceName) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.ServiceName, constants.ServiceName, &query, args)
}
if len(queryParams.HttpRoute) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpRoute, constants.HttpRoute, &query, args)
}
if len(queryParams.HttpCode) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpCode, constants.HttpCode, &query, args)
}
if len(queryParams.HttpHost) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpHost, constants.HttpHost, &query, args)
}
if len(queryParams.HttpMethod) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpMethod, constants.HttpMethod, &query, args)
}
if len(queryParams.HttpUrl) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.HttpUrl, constants.HttpUrl, &query, args)
}
if len(queryParams.Component) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Component, constants.Component, &query, args)
}
if len(queryParams.Operation) > 0 {
args = buildFilterArrayQuery(ctx, excludeMap, queryParams.Operation, constants.OperationDB, &query, args)
}
if len(queryParams.MinDuration) != 0 {
query = query + " AND durationNano >= ?"
args = append(args, queryParams.MinDuration)
}
if len(queryParams.MaxDuration) != 0 {
query = query + " AND durationNano <= ?"
args = append(args, queryParams.MaxDuration)
}
query = getStatusFilters(query, queryParams.Status, excludeMap)
if len(queryParams.Kind) != 0 {
query = query + " AND kind = ?"
args = append(args, queryParams.Kind)
}
for _, item := range queryParams.Tags {
if item.Operator == "in" {
for i, value := range item.Values {
if i == 0 && i == len(item.Values)-1 {
query += " AND has(tags, ?)"
} else if i == 0 && i != len(item.Values)-1 {
query += " AND (has(tags, ?)"
} else if i != 0 && i == len(item.Values)-1 {
query += " OR has(tags, ?))"
} else {
query += " OR has(tags, ?)"
}
args = append(args, fmt.Sprintf("%s:%s", item.Key, value))
}
} else if item.Operator == "not in" {
for i, value := range item.Values {
if i == 0 && i == len(item.Values)-1 {
query += " AND NOT has(tags, ?)"
} else if i == 0 && i != len(item.Values)-1 {
query += " AND NOT (has(tags, ?)"
} else if i != 0 && i == len(item.Values)-1 {
query += " OR has(tags, ?))"
} else {
query += " OR has(tags, ?)"
}
args = append(args, fmt.Sprintf("%s:%s", item.Key, value))
}
} else if item.Operator == "regex" {
if len(item.Values) != 1 {
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Regex tag operator should only have one value")}
}
query = query + " AND match(tagsValues[indexOf(tagsKeys, ?)], ?)"
args = append(args, item.Key)
args = append(args, item.Values[0])
} else if item.Operator == "isnotnull" {
for range item.Values {
query = query + " AND has(tagsKeys, ?)"
args = append(args, item.Key)
}
} else {
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Tag Operator %s not supported", item.Operator)}
}
}
if queryParams.GroupBy != "" {
switch queryParams.GroupBy {
case "serviceName":
query = query + " GROUP BY time, serviceName as groupBy ORDER BY time"
case "httpCode":
query = query + " GROUP BY time, httpCode as groupBy ORDER BY time"
case "httpMethod":
query = query + " GROUP BY time, httpMethod as groupBy ORDER BY time"
case "httpUrl":
query = query + " GROUP BY time, httpUrl as groupBy ORDER BY time"
case "httpRoute":
query = query + " GROUP BY time, httpRoute as groupBy ORDER BY time"
case "httpHost":
query = query + " GROUP BY time, httpHost as groupBy ORDER BY time"
case "dbName":
query = query + " GROUP BY time, dbName as groupBy ORDER BY time"
case "dbOperation":
query = query + " GROUP BY time, dbOperation as groupBy ORDER BY time"
case "operation":
query = query + " GROUP BY time, name as groupBy ORDER BY time"
case "msgSystem":
query = query + " GROUP BY time, msgSystem as groupBy ORDER BY time"
case "msgOperation":
query = query + " GROUP BY time, msgOperation as groupBy ORDER BY time"
case "dbSystem":
query = query + " GROUP BY time, dbSystem as groupBy ORDER BY time"
case "component":
query = query + " GROUP BY time, component as groupBy ORDER BY time"
default:
return nil, &model.ApiError{model.ErrorBadData, fmt.Errorf("groupBy type: %s not supported", queryParams.GroupBy)}
}
} else {
query = query + " GROUP BY time ORDER BY time"
}
err := r.db.Select(&SpanAggregatesDBResponseItems, query, args...)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query")}
}
GetFilteredSpansAggregatesResponse := model.GetFilteredSpansAggregatesResponse{
Items: map[int64]model.SpanAggregatesResponseItem{},
}
for i, _ := range SpanAggregatesDBResponseItems {
timeObj, _ := time.Parse(time.RFC3339Nano, SpanAggregatesDBResponseItems[i].Time)
SpanAggregatesDBResponseItems[i].Timestamp = int64(timeObj.UnixNano())
SpanAggregatesDBResponseItems[i].Time = ""
if queryParams.AggregationOption == "rate_per_sec" {
SpanAggregatesDBResponseItems[i].Value = float32(SpanAggregatesDBResponseItems[i].Value) / float32(queryParams.StepSeconds)
}
if responseElement, ok := GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp]; !ok {
if queryParams.GroupBy != "" && SpanAggregatesDBResponseItems[i].GroupBy.String != "" {
GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp] = model.SpanAggregatesResponseItem{
Timestamp: SpanAggregatesDBResponseItems[i].Timestamp,
GroupBy: map[string]float32{SpanAggregatesDBResponseItems[i].GroupBy.String: SpanAggregatesDBResponseItems[i].Value},
}
} else if queryParams.GroupBy == "" {
GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp] = model.SpanAggregatesResponseItem{
Timestamp: SpanAggregatesDBResponseItems[i].Timestamp,
Value: SpanAggregatesDBResponseItems[i].Value,
}
}
} else {
if queryParams.GroupBy != "" && SpanAggregatesDBResponseItems[i].GroupBy.String != "" {
responseElement.GroupBy[SpanAggregatesDBResponseItems[i].GroupBy.String] = SpanAggregatesDBResponseItems[i].Value
}
GetFilteredSpansAggregatesResponse.Items[SpanAggregatesDBResponseItems[i].Timestamp] = responseElement
}
}
return &GetFilteredSpansAggregatesResponse, nil
}
func (r *ClickHouseReader) SetTTL(ctx context.Context,
params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
var req, tableName string
switch params.Type {
case constants.TraceTTL:
tableName = signozTraceTableName
req = fmt.Sprintf(
"ALTER TABLE default.%v MODIFY TTL toDateTime(timestamp) + INTERVAL %v SECOND DELETE",
tableName, params.DelDuration)
if len(params.ColdStorageVolume) > 0 {
req += fmt.Sprintf(", toDateTime(timestamp) + INTERVAL %v SECOND TO VOLUME '%s'",
params.ToColdStorageDuration, params.ColdStorageVolume)
}
case constants.MetricsTTL:
tableName = signozMetricDBName + "." + signozSampleName
req = fmt.Sprintf(
"ALTER TABLE %v MODIFY TTL toDateTime(toUInt32(timestamp_ms / 1000), 'UTC') + "+
"INTERVAL %v SECOND DELETE", tableName, params.DelDuration)
if len(params.ColdStorageVolume) > 0 {
req += fmt.Sprintf(", toDateTime(toUInt32(timestamp_ms / 1000), 'UTC')"+
" + INTERVAL %v SECOND TO VOLUME '%s'",
params.ToColdStorageDuration, params.ColdStorageVolume)
}
default:
return nil, &model.ApiError{model.ErrorExec,
fmt.Errorf("error while setting ttl. ttl type should be <metrics|traces>, got %v",
params.Type)}
}
// Set the storage policy for the required table. If it is already set, then setting it again
// will not a problem.
if len(params.ColdStorageVolume) > 0 {
policyReq := fmt.Sprintf("ALTER TABLE %s MODIFY SETTING storage_policy='tiered'", tableName)
zap.S().Debugf("Executing Storage policy request: %s\n", policyReq)
if _, err := r.db.Exec(policyReq); err != nil {
zap.S().Error(fmt.Errorf("error while setting storage policy. Err=%v", err))
return nil, &model.ApiError{model.ErrorExec,
fmt.Errorf("error while setting storage policy. Err=%v", err)}
}
}
zap.S().Debugf("Executing TTL request: %s\n", req)
if _, err := r.db.Exec(req); err != nil {
zap.S().Error(fmt.Errorf("error while setting ttl. Err=%v", err))
return nil, &model.ApiError{model.ErrorExec,
fmt.Errorf("error while setting ttl. Err=%v", err)}
}
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
}
// GetDisks returns a list of disks {name, type} configured in clickhouse DB.
func (r *ClickHouseReader) GetDisks(ctx context.Context) (*[]model.DiskItem, *model.ApiError) {
diskItems := []model.DiskItem{}
query := "SELECT name,type FROM system.disks"
if err := r.db.Select(&diskItems, query); err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{model.ErrorExec,
fmt.Errorf("error while getting disks. Err=%v", err)}
}
zap.S().Infof("Got response: %+v\n", diskItems)
return &diskItems, nil
}
func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) {
parseTTL := func(queryResp string) (int, int) {
zap.S().Debugf("Parsing TTL from: %s", queryResp)
deleteTTLExp := regexp.MustCompile(`toIntervalSecond\(([0-9]*)\)`)
moveTTLExp := regexp.MustCompile(`toIntervalSecond\(([0-9]*)\) TO VOLUME`)
var delTTL, moveTTL int = -1, -1
m := deleteTTLExp.FindStringSubmatch(queryResp)
if len(m) > 1 {
seconds_int, err := strconv.Atoi(m[1])
if err != nil {
return -1, -1
}
delTTL = seconds_int / 3600
}
m = moveTTLExp.FindStringSubmatch(queryResp)
if len(m) > 1 {
seconds_int, err := strconv.Atoi(m[1])
if err != nil {
return -1, -1
}
moveTTL = seconds_int / 3600
}
return delTTL, moveTTL
}
getMetricsTTL := func() (*model.DBResponseTTL, *model.ApiError) {
var dbResp model.DBResponseTTL
query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v'", signozSampleName)
err := r.db.QueryRowx(query).StructScan(&dbResp)
if err != nil {
zap.S().Error(fmt.Errorf("error while getting ttl. Err=%v", err))
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("error while getting ttl. Err=%v", err)}
}
return &dbResp, nil
}
getTracesTTL := func() (*model.DBResponseTTL, *model.ApiError) {
var dbResp model.DBResponseTTL
query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v'", signozTraceTableName)
err := r.db.QueryRowx(query).StructScan(&dbResp)
if err != nil {
zap.S().Error(fmt.Errorf("error while getting ttl. Err=%v", err))
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("error while getting ttl. Err=%v", err)}
}
return &dbResp, nil
}
switch ttlParams.Type {
case constants.TraceTTL:
dbResp, err := getTracesTTL()
if err != nil {
return nil, err
}
delTTL, moveTTL := parseTTL(dbResp.EngineFull)
return &model.GetTTLResponseItem{TracesTime: delTTL, TracesMoveTime: moveTTL}, nil
case constants.MetricsTTL:
dbResp, err := getMetricsTTL()
if err != nil {
return nil, err
}
delTTL, moveTTL := parseTTL(dbResp.EngineFull)
return &model.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL}, nil
}
db1, err := getTracesTTL()
if err != nil {
return nil, err
}
db2, err := getMetricsTTL()
if err != nil {
return nil, err
}
tracesDelTTL, tracesMoveTTL := parseTTL(db1.EngineFull)
metricsDelTTL, metricsMoveTTL := parseTTL(db2.EngineFull)
return &model.GetTTLResponseItem{
TracesTime: tracesDelTTL,
TracesMoveTime: tracesMoveTTL,
MetricsTime: metricsDelTTL,
MetricsMoveTime: metricsMoveTTL,
}, nil
}
func (r *ClickHouseReader) GetErrors(ctx context.Context, queryParams *model.GetErrorsParams) (*[]model.Error, *model.ApiError) {
var getErrorReponses []model.Error
query := fmt.Sprintf("SELECT exceptionType, exceptionMessage, count() AS exceptionCount, min(timestamp) as firstSeen, max(timestamp) as lastSeen, serviceName FROM %s WHERE timestamp >= ? AND timestamp <= ? GROUP BY serviceName, exceptionType, exceptionMessage", r.errorTable)
args := []interface{}{strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)}
err := r.db.Select(&getErrorReponses, query, args...)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query")}
}
return &getErrorReponses, nil
}
func (r *ClickHouseReader) GetErrorForId(ctx context.Context, queryParams *model.GetErrorParams) (*model.ErrorWithSpan, *model.ApiError) {
if queryParams.ErrorID == "" {
zap.S().Debug("errorId missing from params")
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("ErrorID missing from params")}
}
var getErrorWithSpanReponse model.ErrorWithSpan
query := fmt.Sprintf("SELECT spanID, traceID, errorID, timestamp, serviceName, exceptionType, exceptionMessage, excepionStacktrace, exceptionEscaped, olderErrorId, newerErrorId FROM (SELECT *, lagInFrame(errorID) over w as olderErrorId, leadInFrame(errorID) over w as newerErrorId FROM %s window w as (ORDER BY exceptionType, serviceName, timestamp rows between unbounded preceding and unbounded following)) WHERE errorID = ?", r.errorTable)
args := []interface{}{queryParams.ErrorID}
err := r.db.Get(&getErrorWithSpanReponse, query, args...)
zap.S().Info(query)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query")}
}
return &getErrorWithSpanReponse, nil
}
func (r *ClickHouseReader) GetErrorForType(ctx context.Context, queryParams *model.GetErrorParams) (*model.ErrorWithSpan, *model.ApiError) {
if queryParams.ErrorType == "" || queryParams.ServiceName == "" {
zap.S().Debug("errorType/serviceName missing from params")
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("ErrorType/serviceName missing from params")}
}
var getErrorWithSpanReponse model.ErrorWithSpan
query := fmt.Sprintf("SELECT spanID, traceID, errorID, timestamp , serviceName, exceptionType, exceptionMessage, excepionStacktrace, exceptionEscaped, newerErrorId, olderErrorId FROM (SELECT *, lagInFrame(errorID) over w as olderErrorId, leadInFrame(errorID) over w as newerErrorId FROM %s WHERE serviceName = ? AND exceptionType = ? window w as (ORDER BY timestamp rows between unbounded preceding and unbounded following)) limit 1", r.errorTable)
args := []interface{}{queryParams.ServiceName, queryParams.ErrorType}
err := r.db.Get(&getErrorWithSpanReponse, query, args...)
zap.S().Info(query)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, &model.ApiError{model.ErrorExec, fmt.Errorf("Error in processing sql query")}
}
return &getErrorWithSpanReponse, nil
}