mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-10 06:19:04 +08:00
chore: address large number of staticcheck issues (#5176)
* chore: address large number of staticcheck issues * chore: fix tests * chore: fix more issue * chore: fix options
This commit is contained in:
parent
1645523ae9
commit
2f949d2738
@ -14,7 +14,6 @@ import (
|
||||
|
||||
"go.signoz.io/signoz/ee/query-service/constants"
|
||||
"go.signoz.io/signoz/ee/query-service/model"
|
||||
"go.signoz.io/signoz/pkg/query-service/auth"
|
||||
baseauth "go.signoz.io/signoz/pkg/query-service/auth"
|
||||
basemodel "go.signoz.io/signoz/pkg/query-service/model"
|
||||
)
|
||||
@ -51,7 +50,7 @@ func (ah *APIHandler) loginUser(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// if all looks good, call auth
|
||||
resp, err := auth.Login(ctx, &req)
|
||||
resp, err := baseauth.Login(ctx, &req)
|
||||
if ah.HandleError(w, err, http.StatusUnauthorized) {
|
||||
return
|
||||
}
|
||||
@ -130,7 +129,7 @@ func (ah *APIHandler) registerUser(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
} else {
|
||||
// no-sso, validate password
|
||||
if err := auth.ValidatePassword(req.Password); err != nil {
|
||||
if err := baseauth.ValidatePassword(req.Password); err != nil {
|
||||
RespondError(w, model.InternalError(fmt.Errorf("password is not in a valid format")), nil)
|
||||
return
|
||||
}
|
||||
@ -241,6 +240,11 @@ func (ah *APIHandler) receiveGoogleAuth(w http.ResponseWriter, r *http.Request)
|
||||
// prepare google callback handler using parsedState -
|
||||
// which contains redirect URL (front-end endpoint)
|
||||
callbackHandler, err := domain.PrepareGoogleOAuthProvider(parsedState)
|
||||
if err != nil {
|
||||
zap.L().Error("[receiveGoogleAuth] failed to prepare google oauth provider", zap.String("domain", domain.String()), zap.Error(err))
|
||||
handleSsoError(w, r, redirectUri)
|
||||
return
|
||||
}
|
||||
|
||||
identity, err := callbackHandler.HandleCallback(r)
|
||||
if err != nil {
|
||||
|
@ -26,10 +26,10 @@ func (r *ClickhouseReader) GetMetricResultEE(ctx context.Context, query string)
|
||||
|
||||
var hash string
|
||||
// If getSubTreeSpans function is used in the clickhouse query
|
||||
if strings.Index(query, "getSubTreeSpans(") != -1 {
|
||||
if strings.Contains(query, "getSubTreeSpans(") {
|
||||
var err error
|
||||
query, hash, err = r.getSubTreeSpansCustomFunction(ctx, query, hash)
|
||||
if err == fmt.Errorf("No spans found for the given query") {
|
||||
if err == fmt.Errorf("no spans found for the given query") {
|
||||
return nil, "", nil
|
||||
}
|
||||
if err != nil {
|
||||
@ -183,7 +183,7 @@ func (r *ClickhouseReader) getSubTreeSpansCustomFunction(ctx context.Context, qu
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return query, hash, fmt.Errorf("Error in processing sql query")
|
||||
return query, hash, fmt.Errorf("error in processing sql query")
|
||||
}
|
||||
|
||||
var searchScanResponses []basemodel.SearchSpanDBResponseItem
|
||||
@ -193,14 +193,14 @@ func (r *ClickhouseReader) getSubTreeSpansCustomFunction(ctx context.Context, qu
|
||||
modelQuery := fmt.Sprintf("SELECT timestamp, traceID, model FROM %s.%s WHERE traceID=$1", r.TraceDB, r.SpansTable)
|
||||
|
||||
if len(getSpansSubQueryDBResponses) == 0 {
|
||||
return query, hash, fmt.Errorf("No spans found for the given query")
|
||||
return query, hash, fmt.Errorf("no spans found for the given query")
|
||||
}
|
||||
zap.L().Debug("Executing query to fetch all the spans from the same TraceID: ", zap.String("modelQuery", modelQuery))
|
||||
err = r.conn.Select(ctx, &searchScanResponses, modelQuery, getSpansSubQueryDBResponses[0].TraceID)
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return query, hash, fmt.Errorf("Error in processing sql query")
|
||||
return query, hash, fmt.Errorf("error in processing sql query")
|
||||
}
|
||||
|
||||
// Process model to fetch the spans
|
||||
@ -263,6 +263,7 @@ func (r *ClickhouseReader) getSubTreeSpansCustomFunction(ctx context.Context, qu
|
||||
return query, hash, nil
|
||||
}
|
||||
|
||||
//lint:ignore SA4009 return hash is feeded to the query
|
||||
func processQuery(query string, hash string) (string, string, string) {
|
||||
re3 := regexp.MustCompile(`getSubTreeSpans`)
|
||||
|
||||
|
@ -61,7 +61,7 @@ func SmartTraceAlgorithm(payload []basemodel.SearchSpanResponseItem, targetSpanI
|
||||
|
||||
// If the target span is not found, return span not found error
|
||||
if targetSpan == nil {
|
||||
return nil, errors.New("Span not found")
|
||||
return nil, errors.New("span not found")
|
||||
}
|
||||
|
||||
// Build the final result
|
||||
@ -118,8 +118,8 @@ func SmartTraceAlgorithm(payload []basemodel.SearchSpanResponseItem, targetSpanI
|
||||
}
|
||||
|
||||
searchSpansResult := []basemodel.SearchSpansResult{{
|
||||
Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References", "Events", "HasError"},
|
||||
Events: make([][]interface{}, len(resultSpansSet)),
|
||||
Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References", "Events", "HasError"},
|
||||
Events: make([][]interface{}, len(resultSpansSet)),
|
||||
IsSubTree: true,
|
||||
},
|
||||
}
|
||||
@ -219,7 +219,7 @@ func breadthFirstSearch(spansPtr *model.SpanForTraceDetails, targetId string) (*
|
||||
}
|
||||
|
||||
for _, child := range current.Children {
|
||||
if ok, _ := visited[child.SpanID]; !ok {
|
||||
if ok := visited[child.SpanID]; !ok {
|
||||
queue = append(queue, child)
|
||||
}
|
||||
}
|
||||
|
@ -28,7 +28,6 @@ import (
|
||||
"go.signoz.io/signoz/ee/query-service/integrations/gateway"
|
||||
"go.signoz.io/signoz/ee/query-service/interfaces"
|
||||
baseauth "go.signoz.io/signoz/pkg/query-service/auth"
|
||||
baseInterface "go.signoz.io/signoz/pkg/query-service/interfaces"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
|
||||
licensepkg "go.signoz.io/signoz/ee/query-service/license"
|
||||
@ -79,9 +78,7 @@ type ServerOptions struct {
|
||||
// Server runs HTTP api service
|
||||
type Server struct {
|
||||
serverOptions *ServerOptions
|
||||
conn net.Listener
|
||||
ruleManager *rules.Manager
|
||||
separatePorts bool
|
||||
|
||||
// public http router
|
||||
httpConn net.Listener
|
||||
@ -91,9 +88,6 @@ type Server struct {
|
||||
privateConn net.Listener
|
||||
privateHTTP *http.Server
|
||||
|
||||
// feature flags
|
||||
featureLookup baseint.FeatureLookup
|
||||
|
||||
// Usage manager
|
||||
usageManager *usage.Manager
|
||||
|
||||
@ -317,7 +311,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
|
||||
func (s *Server) createPrivateServer(apiHandler *api.APIHandler) (*http.Server, error) {
|
||||
|
||||
r := mux.NewRouter()
|
||||
r := baseapp.NewRouter()
|
||||
|
||||
r.Use(baseapp.LogCommentEnricher)
|
||||
r.Use(setTimeoutMiddleware)
|
||||
@ -344,7 +338,7 @@ func (s *Server) createPrivateServer(apiHandler *api.APIHandler) (*http.Server,
|
||||
|
||||
func (s *Server) createPublicServer(apiHandler *api.APIHandler) (*http.Server, error) {
|
||||
|
||||
r := mux.NewRouter()
|
||||
r := baseapp.NewRouter()
|
||||
|
||||
// add auth middleware
|
||||
getUserFromRequest := func(r *http.Request) (*basemodel.UserPayload, error) {
|
||||
@ -385,7 +379,7 @@ func loggingMiddleware(next http.Handler) http.Handler {
|
||||
path, _ := route.GetPathTemplate()
|
||||
startTime := time.Now()
|
||||
next.ServeHTTP(w, r)
|
||||
zap.L().Info(path+"\ttimeTaken:"+time.Now().Sub(startTime).String(), zap.Duration("timeTaken", time.Now().Sub(startTime)), zap.String("path", path))
|
||||
zap.L().Info(path, zap.Duration("timeTaken", time.Since(startTime)), zap.String("path", path))
|
||||
})
|
||||
}
|
||||
|
||||
@ -397,7 +391,7 @@ func loggingMiddlewarePrivate(next http.Handler) http.Handler {
|
||||
path, _ := route.GetPathTemplate()
|
||||
startTime := time.Now()
|
||||
next.ServeHTTP(w, r)
|
||||
zap.L().Info(path+"\tprivatePort: true \ttimeTaken"+time.Now().Sub(startTime).String(), zap.Duration("timeTaken", time.Now().Sub(startTime)), zap.String("path", path), zap.Bool("tprivatePort", true))
|
||||
zap.L().Info(path, zap.Duration("timeTaken", time.Since(startTime)), zap.String("path", path), zap.Bool("tprivatePort", true))
|
||||
})
|
||||
}
|
||||
|
||||
@ -711,7 +705,7 @@ func makeRulesManager(
|
||||
db *sqlx.DB,
|
||||
ch baseint.Reader,
|
||||
disableRules bool,
|
||||
fm baseInterface.FeatureLookup) (*rules.Manager, error) {
|
||||
fm baseint.FeatureLookup) (*rules.Manager, error) {
|
||||
|
||||
// create engine
|
||||
pqle, err := pqle.FromConfigPath(promConfigPath)
|
||||
|
@ -2,11 +2,6 @@ package signozio
|
||||
|
||||
type status string
|
||||
|
||||
const (
|
||||
statusSuccess status = "success"
|
||||
statusError status = "error"
|
||||
)
|
||||
|
||||
type ActivationResult struct {
|
||||
Status status `json:"status"`
|
||||
Data *ActivationResponse `json:"data,omitempty"`
|
||||
|
@ -111,7 +111,7 @@ func (r *Repo) UpdatePlanDetails(ctx context.Context,
|
||||
planDetails string) error {
|
||||
|
||||
if key == "" {
|
||||
return fmt.Errorf("Update Plan Details failed: license key is required")
|
||||
return fmt.Errorf("update plan details failed: license key is required")
|
||||
}
|
||||
|
||||
query := `UPDATE licenses
|
||||
|
@ -32,7 +32,7 @@ func InitDB(db *sqlx.DB) error {
|
||||
|
||||
_, err = db.Exec(table_schema)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error in creating licenses table: %s", err.Error())
|
||||
return fmt.Errorf("error in creating licenses table: %s", err.Error())
|
||||
}
|
||||
|
||||
table_schema = `CREATE TABLE IF NOT EXISTS feature_status (
|
||||
@ -45,7 +45,7 @@ func InitDB(db *sqlx.DB) error {
|
||||
|
||||
_, err = db.Exec(table_schema)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error in creating feature_status table: %s", err.Error())
|
||||
return fmt.Errorf("error in creating feature_status table: %s", err.Error())
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -14,7 +14,6 @@ import (
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
|
||||
"go.signoz.io/signoz/ee/query-service/app"
|
||||
"go.signoz.io/signoz/pkg/query-service/auth"
|
||||
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||
baseconst "go.signoz.io/signoz/pkg/query-service/constants"
|
||||
"go.signoz.io/signoz/pkg/query-service/migrate"
|
||||
"go.signoz.io/signoz/pkg/query-service/version"
|
||||
@ -52,7 +51,8 @@ func initZapLog(enableQueryServiceLogOTLPExport bool) *zap.Logger {
|
||||
)
|
||||
|
||||
if enableQueryServiceLogOTLPExport {
|
||||
ctx, _ := context.WithTimeout(ctx, time.Second*30)
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
|
||||
defer cancel()
|
||||
conn, err := grpc.DialContext(ctx, baseconst.OTLPTarget, grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
if err != nil {
|
||||
log.Fatalf("failed to establish connection: %v", err)
|
||||
@ -148,7 +148,7 @@ func main() {
|
||||
zap.L().Info("JWT secret key set successfully.")
|
||||
}
|
||||
|
||||
if err := migrate.Migrate(constants.RELATIONAL_DATASOURCE_PATH); err != nil {
|
||||
if err := migrate.Migrate(baseconst.RELATIONAL_DATASOURCE_PATH); err != nil {
|
||||
zap.L().Error("Failed to migrate", zap.Error(err))
|
||||
} else {
|
||||
zap.L().Info("Migration successful")
|
||||
|
@ -104,7 +104,7 @@ func (od *OrgDomain) GetSAMLCert() string {
|
||||
// requesting OAuth and also used in processing response from google
|
||||
func (od *OrgDomain) PrepareGoogleOAuthProvider(siteUrl *url.URL) (sso.OAuthCallbackProvider, error) {
|
||||
if od.GoogleAuthConfig == nil {
|
||||
return nil, fmt.Errorf("Google auth is not setup correctly for this domain")
|
||||
return nil, fmt.Errorf("GOOGLE OAUTH is not setup correctly for this domain")
|
||||
}
|
||||
|
||||
return od.GoogleAuthConfig.GetProvider(od.Name, siteUrl)
|
||||
|
@ -53,7 +53,7 @@ func New(dbType string, modelDao dao.ModelDao, licenseRepo *license.Repo, clickh
|
||||
tenantID := ""
|
||||
if len(hostNameRegexMatches) == 2 {
|
||||
tenantID = hostNameRegexMatches[1]
|
||||
tenantID = strings.TrimRight(tenantID, "-clickhouse")
|
||||
tenantID = strings.TrimSuffix(tenantID, "-clickhouse")
|
||||
}
|
||||
|
||||
m := &Manager{
|
||||
|
@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/jmoiron/sqlx"
|
||||
@ -15,10 +14,6 @@ import (
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
func init() {
|
||||
rand.Seed(2000)
|
||||
}
|
||||
|
||||
// Repo handles DDL and DML ops on ingestion rules
|
||||
type Repo struct {
|
||||
db *sqlx.DB
|
||||
|
@ -64,7 +64,7 @@ func (am *AuthMiddleware) EditAccess(f func(http.ResponseWriter, *http.Request))
|
||||
if !(auth.IsEditor(user) || auth.IsAdmin(user)) {
|
||||
RespondError(w, &model.ApiError{
|
||||
Typ: model.ErrorForbidden,
|
||||
Err: errors.New("API is accessible to editors/admins."),
|
||||
Err: errors.New("API is accessible to editors/admins"),
|
||||
}, nil)
|
||||
return
|
||||
}
|
||||
@ -88,7 +88,7 @@ func (am *AuthMiddleware) SelfAccess(f func(http.ResponseWriter, *http.Request))
|
||||
if !(auth.IsSelfAccessRequest(user, id) || auth.IsAdmin(user)) {
|
||||
RespondError(w, &model.ApiError{
|
||||
Typ: model.ErrorForbidden,
|
||||
Err: errors.New("API is accessible for self access or to the admins."),
|
||||
Err: errors.New("API is accessible for self access or to the admins"),
|
||||
}, nil)
|
||||
return
|
||||
}
|
||||
|
@ -42,17 +42,6 @@ const (
|
||||
defaultEncoding Encoding = EncodingJSON
|
||||
)
|
||||
|
||||
const (
|
||||
suffixEnabled = ".enabled"
|
||||
suffixDatasource = ".datasource"
|
||||
suffixOperationsTable = ".operations-table"
|
||||
suffixIndexTable = ".index-table"
|
||||
suffixSpansTable = ".spans-table"
|
||||
suffixWriteBatchDelay = ".write-batch-delay"
|
||||
suffixWriteBatchSize = ".write-batch-size"
|
||||
suffixEncoding = ".encoding"
|
||||
)
|
||||
|
||||
// NamespaceConfig is Clickhouse's internal configuration data
|
||||
type namespaceConfig struct {
|
||||
namespace string
|
||||
|
@ -28,7 +28,6 @@ import (
|
||||
"github.com/prometheus/common/promlog"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/discovery"
|
||||
sd_config "github.com/prometheus/prometheus/discovery"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
|
||||
"github.com/prometheus/prometheus/scrape"
|
||||
@ -291,7 +290,7 @@ func (r *ClickHouseReader) Start(readerReady chan bool) {
|
||||
// they need to read the most updated config when receiving the new targets list.
|
||||
scrapeManager.ApplyConfig,
|
||||
func(cfg *config.Config) error {
|
||||
c := make(map[string]sd_config.Configs)
|
||||
c := make(map[string]discovery.Configs)
|
||||
for _, v := range cfg.ScrapeConfigs {
|
||||
c[v.JobName] = v.ServiceDiscoveryConfigs
|
||||
}
|
||||
@ -462,7 +461,7 @@ func (r *ClickHouseReader) LoadChannel(channel *model.ChannelItem) *model.ApiErr
|
||||
if response.StatusCode > 299 {
|
||||
responseData, _ := io.ReadAll(response.Body)
|
||||
|
||||
err := fmt.Errorf("Error in getting 2xx response in API call to alertmanager/v1/receivers")
|
||||
err := fmt.Errorf("error in getting 2xx response in API call to alertmanager/v1/receivers")
|
||||
zap.L().Error("Error in getting 2xx response in API call to alertmanager/v1/receivers", zap.String("Status", response.Status), zap.String("Data", string(responseData)))
|
||||
|
||||
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
|
||||
@ -547,7 +546,7 @@ func (r *ClickHouseReader) GetChannels() (*[]model.ChannelItem, *model.ApiError)
|
||||
|
||||
channels := []model.ChannelItem{}
|
||||
|
||||
query := fmt.Sprintf("SELECT id, created_at, updated_at, name, type, data data FROM notification_channels")
|
||||
query := "SELECT id, created_at, updated_at, name, type, data data FROM notification_channels"
|
||||
|
||||
err := r.localDB.Select(&channels, query)
|
||||
|
||||
@ -756,7 +755,7 @@ func (r *ClickHouseReader) GetServicesList(ctx context.Context) (*[]string, erro
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
return nil, fmt.Errorf("error in processing sql query")
|
||||
}
|
||||
|
||||
defer rows.Close()
|
||||
@ -938,7 +937,7 @@ func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams *
|
||||
}
|
||||
ops, ok := (*topLevelOps)[queryParams.ServiceName]
|
||||
if !ok {
|
||||
return nil, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("Service not found")}
|
||||
return nil, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("service not found")}
|
||||
}
|
||||
|
||||
namedArgs := []interface{}{
|
||||
@ -1006,7 +1005,7 @@ func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams *
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query")}
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
||||
}
|
||||
|
||||
m := make(map[int64]int)
|
||||
@ -1137,7 +1136,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query: %s", err)}
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
for _, service := range dBResponse {
|
||||
if service.ServiceName != "" {
|
||||
@ -1154,7 +1153,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query: %s", err)}
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
for _, service := range dBResponse {
|
||||
if service.HttpRoute != "" {
|
||||
@ -1171,7 +1170,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query: %s", err)}
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
for _, service := range dBResponse {
|
||||
if service.HttpUrl != "" {
|
||||
@ -1188,7 +1187,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query: %s", err)}
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
for _, service := range dBResponse {
|
||||
if service.HttpMethod != "" {
|
||||
@ -1205,7 +1204,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query: %s", err)}
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
for _, service := range dBResponse {
|
||||
if service.HttpHost != "" {
|
||||
@ -1222,7 +1221,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query: %s", err)}
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
for _, service := range dBResponse {
|
||||
if service.Operation != "" {
|
||||
@ -1238,7 +1237,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query: %s", err)}
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
|
||||
finalQuery2 := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = false", r.TraceDB, r.indexTable)
|
||||
@ -1249,7 +1248,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query: %s", err)}
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
if len(dBResponse) > 0 && len(dBResponse2) > 0 {
|
||||
traceFilterReponse.Status = map[string]uint64{"ok": dBResponse2[0].NumTotal, "error": dBResponse[0].NumTotal}
|
||||
@ -1273,7 +1272,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
|
||||
zap.L().Info(finalQuery)
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query: %s", err)}
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
if len(dBResponse) > 0 {
|
||||
traceFilterReponse.Duration = map[string]uint64{"minDuration": dBResponse[0].Min, "maxDuration": dBResponse[0].Max}
|
||||
@ -1289,7 +1288,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query: %s", err)}
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
|
||||
finalQuery = fmt.Sprintf("SELECT durationNano as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.durationTable)
|
||||
@ -1301,7 +1300,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query: %s", err)}
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)}
|
||||
}
|
||||
if len(dBResponse) > 0 {
|
||||
traceFilterReponse.Duration["minDuration"] = dBResponse[0].NumTotal
|
||||
@ -1460,7 +1459,7 @@ func (r *ClickHouseReader) GetFilteredSpans(ctx context.Context, queryParams *mo
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query")}
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
||||
}
|
||||
if queryParams.Order == constants.Descending {
|
||||
query = query + " ORDER BY timestamp DESC"
|
||||
@ -1498,7 +1497,7 @@ func (r *ClickHouseReader) GetFilteredSpans(ctx context.Context, queryParams *mo
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query")}
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
||||
}
|
||||
|
||||
getFilterSpansResponse := model.GetFilterSpansResponse{
|
||||
@ -1732,7 +1731,7 @@ func (r *ClickHouseReader) GetTagFilters(ctx context.Context, queryParams *model
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query")}
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
||||
}
|
||||
tagFiltersResult := model.TagFilters{
|
||||
StringTagKeys: make([]string, 0),
|
||||
@ -1847,7 +1846,7 @@ func (r *ClickHouseReader) GetTagValues(ctx context.Context, queryParams *model.
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query")}
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
||||
}
|
||||
|
||||
cleanedTagValues := model.TagValues{
|
||||
@ -1939,7 +1938,7 @@ func (r *ClickHouseReader) GetUsage(ctx context.Context, queryParams *model.GetU
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, fmt.Errorf("Error in processing sql query")
|
||||
return nil, fmt.Errorf("error in processing sql query")
|
||||
}
|
||||
|
||||
for i := range usageItems {
|
||||
@ -1976,7 +1975,7 @@ func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.Searc
|
||||
}
|
||||
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_MAX_SPANS_ALLOWED_LIMIT_REACHED, data, userEmail, true, false)
|
||||
}
|
||||
return nil, fmt.Errorf("Max spans allowed in trace limit reached, please contact support for more details")
|
||||
return nil, fmt.Errorf("max spans allowed in trace limit reached, please contact support for more details")
|
||||
}
|
||||
|
||||
userEmail, err := auth.GetEmailFromJwt(ctx)
|
||||
@ -2130,7 +2129,7 @@ func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, query
|
||||
case "sum":
|
||||
aggregation_query = " sum(durationNano) as value "
|
||||
default:
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("Aggregate type: %s not supported", queryParams.AggregationOption)}
|
||||
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("aggregate type: %s not supported", queryParams.AggregationOption)}
|
||||
}
|
||||
} else if queryParams.Dimension == "calls" {
|
||||
aggregation_query = " count(*) as value "
|
||||
@ -2236,7 +2235,7 @@ func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, query
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query")}
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
||||
}
|
||||
|
||||
GetFilteredSpansAggregatesResponse := model.GetFilteredSpansAggregatesResponse{
|
||||
@ -2306,7 +2305,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
|
||||
tableName := getLocalTableName(tableName)
|
||||
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
|
||||
if err != nil {
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")}
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
|
||||
}
|
||||
if statusItem.Status == constants.StatusPending {
|
||||
return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
|
||||
@ -2341,7 +2340,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
|
||||
}
|
||||
return
|
||||
}
|
||||
req += fmt.Sprint(" SETTINGS distributed_ddl_task_timeout = -1;")
|
||||
req += " SETTINGS distributed_ddl_task_timeout = -1;"
|
||||
zap.L().Error("Executing TTL request: ", zap.String("request", req))
|
||||
statusItem, _ := r.checkTTLStatusItem(ctx, tableName)
|
||||
if err := r.db.Exec(context.Background(), req); err != nil {
|
||||
@ -2404,7 +2403,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
|
||||
}
|
||||
return
|
||||
}
|
||||
req += fmt.Sprint(" SETTINGS distributed_ddl_task_timeout = -1")
|
||||
req += " SETTINGS distributed_ddl_task_timeout = -1"
|
||||
zap.L().Info("Executing TTL request: ", zap.String("request", req))
|
||||
statusItem, _ := r.checkTTLStatusItem(ctx, tableName)
|
||||
if err := r.db.Exec(ctx, req); err != nil {
|
||||
@ -2461,7 +2460,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
|
||||
}
|
||||
return
|
||||
}
|
||||
req += fmt.Sprint(" SETTINGS distributed_ddl_task_timeout = -1")
|
||||
req += " SETTINGS distributed_ddl_task_timeout = -1"
|
||||
zap.L().Info("Executing TTL request: ", zap.String("request", req))
|
||||
statusItem, _ := r.checkTTLStatusItem(ctx, tableName)
|
||||
if err := r.db.Exec(ctx, req); err != nil {
|
||||
@ -2517,7 +2516,7 @@ func (r *ClickHouseReader) checkTTLStatusItem(ctx context.Context, tableName str
|
||||
}
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return model.TTLStatusItem{}, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")}
|
||||
return model.TTLStatusItem{}, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
|
||||
}
|
||||
return statusItem[0], nil
|
||||
}
|
||||
@ -2533,7 +2532,7 @@ func (r *ClickHouseReader) setTTLQueryStatus(ctx context.Context, tableNameArray
|
||||
return "", nil
|
||||
}
|
||||
if err != nil {
|
||||
return "", &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")}
|
||||
return "", &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
|
||||
}
|
||||
if statusItem.Status == constants.StatusPending && statusItem.UpdatedAt.Unix()-time.Now().Unix() < 3600 {
|
||||
status = constants.StatusPending
|
||||
@ -2818,7 +2817,7 @@ func (r *ClickHouseReader) ListErrors(ctx context.Context, queryParams *model.Li
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query")}
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
||||
}
|
||||
|
||||
return &getErrorResponses, nil
|
||||
@ -2855,7 +2854,7 @@ func (r *ClickHouseReader) CountErrors(ctx context.Context, queryParams *model.C
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return 0, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query")}
|
||||
return 0, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
||||
}
|
||||
|
||||
return errorCount, nil
|
||||
@ -2877,7 +2876,7 @@ func (r *ClickHouseReader) GetErrorFromErrorID(ctx context.Context, queryParams
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query")}
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
||||
}
|
||||
|
||||
if len(getErrorWithSpanReponse) > 0 {
|
||||
@ -2901,7 +2900,7 @@ func (r *ClickHouseReader) GetErrorFromGroupID(ctx context.Context, queryParams
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query")}
|
||||
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
||||
}
|
||||
|
||||
if len(getErrorWithSpanReponse) > 0 {
|
||||
@ -2949,7 +2948,7 @@ func (r *ClickHouseReader) getNextErrorID(ctx context.Context, queryParams *mode
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return "", time.Time{}, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query")}
|
||||
return "", time.Time{}, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
||||
}
|
||||
if len(getNextErrorIDReponse) == 0 {
|
||||
zap.L().Info("NextErrorID not found")
|
||||
@ -2970,7 +2969,7 @@ func (r *ClickHouseReader) getNextErrorID(ctx context.Context, queryParams *mode
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return "", time.Time{}, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query")}
|
||||
return "", time.Time{}, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
||||
}
|
||||
if len(getNextErrorIDReponse) == 0 {
|
||||
var getNextErrorIDReponse []model.NextPrevErrorIDsDBResponse
|
||||
@ -2984,7 +2983,7 @@ func (r *ClickHouseReader) getNextErrorID(ctx context.Context, queryParams *mode
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return "", time.Time{}, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query")}
|
||||
return "", time.Time{}, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
||||
}
|
||||
|
||||
if len(getNextErrorIDReponse) == 0 {
|
||||
@ -3018,7 +3017,7 @@ func (r *ClickHouseReader) getPrevErrorID(ctx context.Context, queryParams *mode
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return "", time.Time{}, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query")}
|
||||
return "", time.Time{}, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
||||
}
|
||||
if len(getPrevErrorIDReponse) == 0 {
|
||||
zap.L().Info("PrevErrorID not found")
|
||||
@ -3039,7 +3038,7 @@ func (r *ClickHouseReader) getPrevErrorID(ctx context.Context, queryParams *mode
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return "", time.Time{}, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query")}
|
||||
return "", time.Time{}, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
||||
}
|
||||
if len(getPrevErrorIDReponse) == 0 {
|
||||
var getPrevErrorIDReponse []model.NextPrevErrorIDsDBResponse
|
||||
@ -3053,7 +3052,7 @@ func (r *ClickHouseReader) getPrevErrorID(ctx context.Context, queryParams *mode
|
||||
|
||||
if err != nil {
|
||||
zap.L().Error("Error in processing sql query", zap.Error(err))
|
||||
return "", time.Time{}, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing sql query")}
|
||||
return "", time.Time{}, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")}
|
||||
}
|
||||
|
||||
if len(getPrevErrorIDReponse) == 0 {
|
||||
@ -3414,7 +3413,7 @@ func isDashboardWithPanelAndName(data map[string]interface{}) bool {
|
||||
if ok && title != "Sample Title" {
|
||||
isDashboardName = true
|
||||
}
|
||||
widgets, ok := data["widgets"].(interface{})
|
||||
widgets, ok := data["widgets"]
|
||||
if ok && isDashboardName {
|
||||
data, ok := widgets.([]interface{})
|
||||
if ok && len(data) > 0 {
|
||||
@ -3429,7 +3428,7 @@ func countPanelsInDashboard(data map[string]interface{}) model.DashboardsInfo {
|
||||
var logsPanelCount, tracesPanelCount, metricsPanelCount int
|
||||
// totalPanels := 0
|
||||
if data != nil && data["widgets"] != nil {
|
||||
widgets, ok := data["widgets"].(interface{})
|
||||
widgets, ok := data["widgets"]
|
||||
if ok {
|
||||
data, ok := widgets.([]interface{})
|
||||
if ok {
|
||||
@ -3437,9 +3436,9 @@ func countPanelsInDashboard(data map[string]interface{}) model.DashboardsInfo {
|
||||
sData, ok := widget.(map[string]interface{})
|
||||
if ok && sData["query"] != nil {
|
||||
// totalPanels++
|
||||
query, ok := sData["query"].(interface{}).(map[string]interface{})
|
||||
query, ok := sData["query"].(map[string]interface{})
|
||||
if ok && query["queryType"] == "builder" && query["builder"] != nil {
|
||||
builderData, ok := query["builder"].(interface{}).(map[string]interface{})
|
||||
builderData, ok := query["builder"].(map[string]interface{})
|
||||
if ok && builderData["queryData"] != nil {
|
||||
builderQueryData, ok := builderData["queryData"].([]interface{})
|
||||
if ok {
|
||||
@ -3583,7 +3582,7 @@ func isSelectedField(tableStatement string, field model.LogField) bool {
|
||||
// in case of attributes and resources, if there is a materialized column present then it is selected
|
||||
// TODO: handle partial change complete eg:- index is removed but materialized column is still present
|
||||
name := utils.GetClickhouseColumnName(field.Type, field.DataType, field.Name)
|
||||
return strings.Contains(tableStatement, fmt.Sprintf("%s", name))
|
||||
return strings.Contains(tableStatement, name)
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError {
|
||||
@ -4587,7 +4586,7 @@ func readRowsForTimeSeriesResult(rows driver.Rows, vars []interface{}, columnNam
|
||||
|
||||
func logComment(ctx context.Context) string {
|
||||
// Get the key-value pairs from context for log comment
|
||||
kv := ctx.Value("log_comment")
|
||||
kv := ctx.Value(common.LogCommentKey)
|
||||
if kv == nil {
|
||||
return ""
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
||||
"go.signoz.io/signoz/pkg/query-service/common"
|
||||
)
|
||||
|
||||
type ClickhouseQuerySettings struct {
|
||||
@ -65,7 +66,7 @@ func (c clickhouseConnWrapper) addClickHouseSettings(ctx context.Context, query
|
||||
|
||||
func (c clickhouseConnWrapper) getLogComment(ctx context.Context) string {
|
||||
// Get the key-value pairs from context for log comment
|
||||
kv := ctx.Value("log_comment")
|
||||
kv := ctx.Value(common.LogCommentKey)
|
||||
if kv == nil {
|
||||
return ""
|
||||
}
|
||||
|
@ -706,7 +706,7 @@ func countTraceAndLogsPanel(data map[string]interface{}) (int64, int64) {
|
||||
count := int64(0)
|
||||
totalPanels := int64(0)
|
||||
if data != nil && data["widgets"] != nil {
|
||||
widgets, ok := data["widgets"].(interface{})
|
||||
widgets, ok := data["widgets"]
|
||||
if ok {
|
||||
data, ok := widgets.([]interface{})
|
||||
if ok {
|
||||
@ -714,9 +714,9 @@ func countTraceAndLogsPanel(data map[string]interface{}) (int64, int64) {
|
||||
sData, ok := widget.(map[string]interface{})
|
||||
if ok && sData["query"] != nil {
|
||||
totalPanels++
|
||||
query, ok := sData["query"].(interface{}).(map[string]interface{})
|
||||
query, ok := sData["query"].(map[string]interface{})
|
||||
if ok && query["queryType"] == "builder" && query["builder"] != nil {
|
||||
builderData, ok := query["builder"].(interface{}).(map[string]interface{})
|
||||
builderData, ok := query["builder"].(map[string]interface{})
|
||||
if ok && builderData["queryData"] != nil {
|
||||
builderQueryData, ok := builderData["queryData"].([]interface{})
|
||||
if ok {
|
||||
@ -742,7 +742,7 @@ func countTraceAndLogsPanel(data map[string]interface{}) (int64, int64) {
|
||||
func getWidgetIds(data map[string]interface{}) []string {
|
||||
widgetIds := []string{}
|
||||
if data != nil && data["widgets"] != nil {
|
||||
widgets, ok := data["widgets"].(interface{})
|
||||
widgets, ok := data["widgets"]
|
||||
if ok {
|
||||
data, ok := widgets.([]interface{})
|
||||
if ok {
|
||||
|
@ -39,7 +39,6 @@ import (
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/postprocess"
|
||||
|
||||
"go.uber.org/multierr"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
|
||||
@ -66,19 +65,14 @@ func NewRouter() *mux.Router {
|
||||
return mux.NewRouter().UseEncodedPath()
|
||||
}
|
||||
|
||||
// APIHandler implements the query service public API by registering routes at httpPrefix
|
||||
// APIHandler implements the query service public API
|
||||
type APIHandler struct {
|
||||
// queryService *querysvc.QueryService
|
||||
// queryParser queryParser
|
||||
basePath string
|
||||
apiPrefix string
|
||||
reader interfaces.Reader
|
||||
skipConfig *model.SkipConfig
|
||||
appDao dao.ModelDao
|
||||
alertManager am.Manager
|
||||
ruleManager *rules.Manager
|
||||
featureFlags interfaces.FeatureLookup
|
||||
ready func(http.HandlerFunc) http.HandlerFunc
|
||||
querier interfaces.Querier
|
||||
querierV2 interfaces.Querier
|
||||
queryBuilder *queryBuilder.QueryBuilder
|
||||
@ -194,8 +188,6 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {
|
||||
}
|
||||
aH.queryBuilder = queryBuilder.NewQueryBuilder(builderOpts, aH.featureFlags)
|
||||
|
||||
aH.ready = aH.testReady
|
||||
|
||||
dashboards.LoadDashboardFiles(aH.featureFlags)
|
||||
// if errReadingDashboards != nil {
|
||||
// return nil, errReadingDashboards
|
||||
@ -227,32 +219,6 @@ type structuredResponse struct {
|
||||
type structuredError struct {
|
||||
Code int `json:"code,omitempty"`
|
||||
Msg string `json:"msg"`
|
||||
// TraceID ui.TraceID `json:"traceID,omitempty"`
|
||||
}
|
||||
|
||||
var corsHeaders = map[string]string{
|
||||
"Access-Control-Allow-Headers": "Accept, Authorization, Content-Type, Origin",
|
||||
"Access-Control-Allow-Methods": "GET, OPTIONS",
|
||||
"Access-Control-Allow-Origin": "*",
|
||||
"Access-Control-Expose-Headers": "Date",
|
||||
}
|
||||
|
||||
// Enables cross-site script calls.
|
||||
func setCORS(w http.ResponseWriter) {
|
||||
for h, v := range corsHeaders {
|
||||
w.Header().Set(h, v)
|
||||
}
|
||||
}
|
||||
|
||||
type apiFunc func(r *http.Request) (interface{}, *model.ApiError, func())
|
||||
|
||||
// Checks if server is ready, calls f if it is, returns 503 if it is not.
|
||||
func (aH *APIHandler) testReady(f http.HandlerFunc) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
f(w, r)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
type ApiResponse struct {
|
||||
@ -660,6 +626,9 @@ func (aH *APIHandler) getDashboards(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
ic := aH.IntegrationsController
|
||||
installedIntegrationDashboards, err := ic.GetDashboardsForInstalledIntegrations(r.Context())
|
||||
if err != nil {
|
||||
zap.L().Error("failed to get dashboards for installed integrations", zap.Error(err))
|
||||
}
|
||||
allDashboards = append(allDashboards, installedIntegrationDashboards...)
|
||||
|
||||
tagsFromReq, ok := r.URL.Query()["tags"]
|
||||
@ -761,7 +730,7 @@ func prepareQuery(r *http.Request) (string, error) {
|
||||
|
||||
for _, op := range notAllowedOps {
|
||||
if strings.Contains(strings.ToLower(query), op) {
|
||||
return "", fmt.Errorf("Operation %s is not allowed", op)
|
||||
return "", fmt.Errorf("operation %s is not allowed", op)
|
||||
}
|
||||
}
|
||||
|
||||
@ -864,7 +833,6 @@ func (aH *APIHandler) saveAndReturn(w http.ResponseWriter, r *http.Request, sign
|
||||
return
|
||||
}
|
||||
aH.Respond(w, dashboard)
|
||||
return
|
||||
}
|
||||
|
||||
func (aH *APIHandler) createDashboardsTransform(w http.ResponseWriter, r *http.Request) {
|
||||
@ -872,6 +840,11 @@ func (aH *APIHandler) createDashboardsTransform(w http.ResponseWriter, r *http.R
|
||||
defer r.Body.Close()
|
||||
b, err := io.ReadAll(r.Body)
|
||||
|
||||
if err != nil {
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, "Error reading request body")
|
||||
return
|
||||
}
|
||||
|
||||
var importData model.GrafanaJSON
|
||||
|
||||
err = json.Unmarshal(b, &importData)
|
||||
@ -1147,9 +1120,6 @@ func (aH *APIHandler) createRule(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
}
|
||||
|
||||
func (aH *APIHandler) queryRangeMetricsFromClickhouse(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
}
|
||||
func (aH *APIHandler) queryRangeMetrics(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
query, apiErrorObj := parseQueryRangeRequest(r)
|
||||
@ -1187,11 +1157,11 @@ func (aH *APIHandler) queryRangeMetrics(w http.ResponseWriter, r *http.Request)
|
||||
if res.Err != nil {
|
||||
switch res.Err.(type) {
|
||||
case promql.ErrQueryCanceled:
|
||||
RespondError(w, &model.ApiError{model.ErrorCanceled, res.Err}, nil)
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorCanceled, Err: res.Err}, nil)
|
||||
case promql.ErrQueryTimeout:
|
||||
RespondError(w, &model.ApiError{model.ErrorTimeout, res.Err}, nil)
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorTimeout, Err: res.Err}, nil)
|
||||
}
|
||||
RespondError(w, &model.ApiError{model.ErrorExec, res.Err}, nil)
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorExec, Err: res.Err}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
@ -1242,11 +1212,11 @@ func (aH *APIHandler) queryMetrics(w http.ResponseWriter, r *http.Request) {
|
||||
if res.Err != nil {
|
||||
switch res.Err.(type) {
|
||||
case promql.ErrQueryCanceled:
|
||||
RespondError(w, &model.ApiError{model.ErrorCanceled, res.Err}, nil)
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorCanceled, Err: res.Err}, nil)
|
||||
case promql.ErrQueryTimeout:
|
||||
RespondError(w, &model.ApiError{model.ErrorTimeout, res.Err}, nil)
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorTimeout, Err: res.Err}, nil)
|
||||
}
|
||||
RespondError(w, &model.ApiError{model.ErrorExec, res.Err}, nil)
|
||||
RespondError(w, &model.ApiError{Typ: model.ErrorExec, Err: res.Err}, nil)
|
||||
}
|
||||
|
||||
response_data := &model.QueryData{
|
||||
@ -1869,7 +1839,7 @@ func (aH *APIHandler) getUser(w http.ResponseWriter, r *http.Request) {
|
||||
if user == nil {
|
||||
RespondError(w, &model.ApiError{
|
||||
Typ: model.ErrorInternal,
|
||||
Err: errors.New("User not found"),
|
||||
Err: errors.New("user not found"),
|
||||
}, nil)
|
||||
return
|
||||
}
|
||||
@ -1936,7 +1906,7 @@ func (aH *APIHandler) deleteUser(w http.ResponseWriter, r *http.Request) {
|
||||
if user == nil {
|
||||
RespondError(w, &model.ApiError{
|
||||
Typ: model.ErrorNotFound,
|
||||
Err: errors.New("User not found"),
|
||||
Err: errors.New("no user found"),
|
||||
}, nil)
|
||||
return
|
||||
}
|
||||
@ -2009,7 +1979,7 @@ func (aH *APIHandler) getRole(w http.ResponseWriter, r *http.Request) {
|
||||
if user == nil {
|
||||
RespondError(w, &model.ApiError{
|
||||
Typ: model.ErrorNotFound,
|
||||
Err: errors.New("No user found"),
|
||||
Err: errors.New("no user found"),
|
||||
}, nil)
|
||||
return
|
||||
}
|
||||
@ -2058,7 +2028,7 @@ func (aH *APIHandler) editRole(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
if len(adminUsers) == 1 {
|
||||
RespondError(w, &model.ApiError{
|
||||
Err: errors.New("Cannot demote the last admin"),
|
||||
Err: errors.New("cannot demote the last admin"),
|
||||
Typ: model.ErrorInternal}, nil)
|
||||
return
|
||||
}
|
||||
@ -2110,6 +2080,9 @@ func (aH *APIHandler) editOrg(w http.ResponseWriter, r *http.Request) {
|
||||
"organizationName": req.Name,
|
||||
}
|
||||
userEmail, err := auth.GetEmailFromJwt(r.Context())
|
||||
if err != nil {
|
||||
zap.L().Error("failed to get user email from jwt", zap.Error(err))
|
||||
}
|
||||
telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_ORG_SETTINGS, data, userEmail, true, false)
|
||||
|
||||
aH.WriteJSON(w, r, map[string]string{"data": "org updated successfully"})
|
||||
@ -2447,7 +2420,7 @@ func (ah *APIHandler) calculateLogsConnectionStatus(
|
||||
},
|
||||
},
|
||||
}
|
||||
queryRes, err, _ := ah.querier.QueryRange(
|
||||
queryRes, _, err := ah.querier.QueryRange(
|
||||
ctx, qrParams, map[string]v3.AttributeKey{},
|
||||
)
|
||||
if err != nil {
|
||||
@ -2967,185 +2940,6 @@ func (aH *APIHandler) autoCompleteAttributeValues(w http.ResponseWriter, r *http
|
||||
aH.Respond(w, response)
|
||||
}
|
||||
|
||||
func (aH *APIHandler) execClickHouseGraphQueries(ctx context.Context, queries map[string]string) ([]*v3.Result, error, map[string]error) {
|
||||
type channelResult struct {
|
||||
Series []*v3.Series
|
||||
Err error
|
||||
Name string
|
||||
Query string
|
||||
}
|
||||
|
||||
ch := make(chan channelResult, len(queries))
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for name, query := range queries {
|
||||
wg.Add(1)
|
||||
go func(name, query string) {
|
||||
defer wg.Done()
|
||||
|
||||
seriesList, err := aH.reader.GetTimeSeriesResultV3(ctx, query)
|
||||
|
||||
if err != nil {
|
||||
ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err), Name: name, Query: query}
|
||||
return
|
||||
}
|
||||
ch <- channelResult{Series: seriesList, Name: name, Query: query}
|
||||
}(name, query)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(ch)
|
||||
|
||||
var errs []error
|
||||
errQuriesByName := make(map[string]error)
|
||||
res := make([]*v3.Result, 0)
|
||||
// read values from the channel
|
||||
for r := range ch {
|
||||
if r.Err != nil {
|
||||
errs = append(errs, r.Err)
|
||||
errQuriesByName[r.Name] = r.Err
|
||||
continue
|
||||
}
|
||||
res = append(res, &v3.Result{
|
||||
QueryName: r.Name,
|
||||
Series: r.Series,
|
||||
})
|
||||
}
|
||||
if len(errs) != 0 {
|
||||
return nil, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)), errQuriesByName
|
||||
}
|
||||
return res, nil, nil
|
||||
}
|
||||
|
||||
func (aH *APIHandler) execClickHouseListQueries(ctx context.Context, queries map[string]string) ([]*v3.Result, error, map[string]error) {
|
||||
type channelResult struct {
|
||||
List []*v3.Row
|
||||
Err error
|
||||
Name string
|
||||
Query string
|
||||
}
|
||||
|
||||
ch := make(chan channelResult, len(queries))
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for name, query := range queries {
|
||||
wg.Add(1)
|
||||
go func(name, query string) {
|
||||
defer wg.Done()
|
||||
rowList, err := aH.reader.GetListResultV3(ctx, query)
|
||||
|
||||
if err != nil {
|
||||
ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err), Name: name, Query: query}
|
||||
return
|
||||
}
|
||||
ch <- channelResult{List: rowList, Name: name, Query: query}
|
||||
}(name, query)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(ch)
|
||||
|
||||
var errs []error
|
||||
errQuriesByName := make(map[string]error)
|
||||
res := make([]*v3.Result, 0)
|
||||
// read values from the channel
|
||||
for r := range ch {
|
||||
if r.Err != nil {
|
||||
errs = append(errs, r.Err)
|
||||
errQuriesByName[r.Name] = r.Err
|
||||
continue
|
||||
}
|
||||
res = append(res, &v3.Result{
|
||||
QueryName: r.Name,
|
||||
List: r.List,
|
||||
})
|
||||
}
|
||||
if len(errs) != 0 {
|
||||
return nil, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)), errQuriesByName
|
||||
}
|
||||
return res, nil, nil
|
||||
}
|
||||
|
||||
func (aH *APIHandler) execPromQueries(ctx context.Context, metricsQueryRangeParams *v3.QueryRangeParamsV3) ([]*v3.Result, error, map[string]error) {
|
||||
type channelResult struct {
|
||||
Series []*v3.Series
|
||||
Err error
|
||||
Name string
|
||||
Query string
|
||||
}
|
||||
|
||||
ch := make(chan channelResult, len(metricsQueryRangeParams.CompositeQuery.PromQueries))
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for name, query := range metricsQueryRangeParams.CompositeQuery.PromQueries {
|
||||
if query.Disabled {
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(name string, query *v3.PromQuery) {
|
||||
var seriesList []*v3.Series
|
||||
defer wg.Done()
|
||||
tmpl := template.New("promql-query")
|
||||
tmpl, tmplErr := tmpl.Parse(query.Query)
|
||||
if tmplErr != nil {
|
||||
ch <- channelResult{Err: fmt.Errorf("error in parsing query-%s: %v", name, tmplErr), Name: name, Query: query.Query}
|
||||
return
|
||||
}
|
||||
var queryBuf bytes.Buffer
|
||||
tmplErr = tmpl.Execute(&queryBuf, metricsQueryRangeParams.Variables)
|
||||
if tmplErr != nil {
|
||||
ch <- channelResult{Err: fmt.Errorf("error in parsing query-%s: %v", name, tmplErr), Name: name, Query: query.Query}
|
||||
return
|
||||
}
|
||||
query.Query = queryBuf.String()
|
||||
queryModel := model.QueryRangeParams{
|
||||
Start: time.UnixMilli(metricsQueryRangeParams.Start),
|
||||
End: time.UnixMilli(metricsQueryRangeParams.End),
|
||||
Step: time.Duration(metricsQueryRangeParams.Step * int64(time.Second)),
|
||||
Query: query.Query,
|
||||
}
|
||||
promResult, _, err := aH.reader.GetQueryRangeResult(ctx, &queryModel)
|
||||
if err != nil {
|
||||
ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err), Name: name, Query: query.Query}
|
||||
return
|
||||
}
|
||||
matrix, _ := promResult.Matrix()
|
||||
for _, v := range matrix {
|
||||
var s v3.Series
|
||||
s.Labels = v.Metric.Copy().Map()
|
||||
for _, p := range v.Floats {
|
||||
s.Points = append(s.Points, v3.Point{Timestamp: p.T, Value: p.F})
|
||||
}
|
||||
seriesList = append(seriesList, &s)
|
||||
}
|
||||
ch <- channelResult{Series: seriesList, Name: name, Query: query.Query}
|
||||
}(name, query)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(ch)
|
||||
|
||||
var errs []error
|
||||
errQuriesByName := make(map[string]error)
|
||||
res := make([]*v3.Result, 0)
|
||||
// read values from the channel
|
||||
for r := range ch {
|
||||
if r.Err != nil {
|
||||
errs = append(errs, r.Err)
|
||||
errQuriesByName[r.Name] = r.Err
|
||||
continue
|
||||
}
|
||||
res = append(res, &v3.Result{
|
||||
QueryName: r.Name,
|
||||
Series: r.Series,
|
||||
})
|
||||
}
|
||||
if len(errs) != 0 {
|
||||
return nil, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)), errQuriesByName
|
||||
}
|
||||
return res, nil, nil
|
||||
}
|
||||
|
||||
func (aH *APIHandler) getLogFieldsV3(ctx context.Context, queryRangeParams *v3.QueryRangeParamsV3) (map[string]v3.AttributeKey, error) {
|
||||
data := map[string]v3.AttributeKey{}
|
||||
for _, query := range queryRangeParams.CompositeQuery.BuilderQueries {
|
||||
@ -3254,7 +3048,7 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que
|
||||
}
|
||||
}
|
||||
|
||||
result, err, errQuriesByName = aH.querier.QueryRange(ctx, queryRangeParams, spanKeys)
|
||||
result, errQuriesByName, err = aH.querier.QueryRange(ctx, queryRangeParams, spanKeys)
|
||||
|
||||
if err != nil {
|
||||
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
|
||||
@ -3510,7 +3304,7 @@ func (aH *APIHandler) queryRangeV4(ctx context.Context, queryRangeParams *v3.Que
|
||||
}
|
||||
}
|
||||
|
||||
result, err, errQuriesByName = aH.querierV2.QueryRange(ctx, queryRangeParams, spanKeys)
|
||||
result, errQuriesByName, err = aH.querierV2.QueryRange(ctx, queryRangeParams, spanKeys)
|
||||
|
||||
if err != nil {
|
||||
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
|
||||
|
@ -67,7 +67,7 @@ func TestPrepareQuery(t *testing.T) {
|
||||
Query: "ALTER TABLE signoz_table DELETE where true",
|
||||
},
|
||||
expectedErr: true,
|
||||
errMsg: "Operation alter table is not allowed",
|
||||
errMsg: "operation alter table is not allowed",
|
||||
},
|
||||
{
|
||||
name: "query text produces template exec error",
|
||||
|
@ -108,7 +108,7 @@ func (c *Controller) Uninstall(
|
||||
) *model.ApiError {
|
||||
if len(req.IntegrationId) < 1 {
|
||||
return model.BadRequest(fmt.Errorf(
|
||||
"integration_id is required.",
|
||||
"integration_id is required",
|
||||
))
|
||||
}
|
||||
|
||||
|
@ -11,7 +11,7 @@ import (
|
||||
|
||||
func InitSqliteDBIfNeeded(db *sqlx.DB) error {
|
||||
if db == nil {
|
||||
return fmt.Errorf("db is required.")
|
||||
return fmt.Errorf("db is required")
|
||||
}
|
||||
|
||||
createTablesStatements := `
|
||||
|
@ -177,7 +177,7 @@ func (r *Repo) GetPipeline(
|
||||
|
||||
if len(pipelines) == 0 {
|
||||
zap.L().Warn("No row found for ingestion pipeline id", zap.String("id", id))
|
||||
return nil, model.NotFoundError(fmt.Errorf("No row found for ingestion pipeline id %v", id))
|
||||
return nil, model.NotFoundError(fmt.Errorf("no row found for ingestion pipeline id %v", id))
|
||||
}
|
||||
|
||||
if len(pipelines) == 1 {
|
||||
|
@ -93,7 +93,7 @@ func (p *PostablePipeline) IsValid() error {
|
||||
|
||||
func isValidOperator(op PipelineOperator) error {
|
||||
if op.ID == "" {
|
||||
return errors.New("PipelineOperator.ID is required.")
|
||||
return errors.New("PipelineOperator.ID is required")
|
||||
}
|
||||
|
||||
switch op.Type {
|
||||
@ -204,7 +204,7 @@ func isValidOperator(op PipelineOperator) error {
|
||||
}
|
||||
|
||||
validMappingLevels := []string{"trace", "debug", "info", "warn", "error", "fatal"}
|
||||
for k, _ := range op.SeverityMapping {
|
||||
for k := range op.SeverityMapping {
|
||||
if !slices.Contains(validMappingLevels, strings.ToLower(k)) {
|
||||
return fmt.Errorf("%s is not a valid severity in processor %s", k, op.ID)
|
||||
}
|
||||
|
@ -2,7 +2,6 @@ package logparsingpipeline
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
@ -74,9 +73,9 @@ func SimulatePipelinesProcessing(
|
||||
timeout,
|
||||
)
|
||||
if apiErr != nil {
|
||||
return nil, collectorErrs, model.WrapApiError(apiErr, fmt.Sprintf(
|
||||
return nil, collectorErrs, model.WrapApiError(apiErr,
|
||||
"could not simulate log pipelines processing.\nCollector errors",
|
||||
))
|
||||
)
|
||||
}
|
||||
|
||||
outputSignozLogs := PLogsToSignozLogs(outputPLogs)
|
||||
|
@ -225,15 +225,15 @@ func makeTestSignozLog(
|
||||
}
|
||||
|
||||
for k, v := range attributes {
|
||||
switch v.(type) {
|
||||
switch v := v.(type) {
|
||||
case bool:
|
||||
testLog.Attributes_bool[k] = v.(bool)
|
||||
testLog.Attributes_bool[k] = v
|
||||
case string:
|
||||
testLog.Attributes_string[k] = v.(string)
|
||||
testLog.Attributes_string[k] = v
|
||||
case int:
|
||||
testLog.Attributes_int64[k] = int64(v.(int))
|
||||
testLog.Attributes_int64[k] = int64(v)
|
||||
case float64:
|
||||
testLog.Attributes_float64[k] = v.(float64)
|
||||
testLog.Attributes_float64[k] = v
|
||||
default:
|
||||
panic(fmt.Sprintf("found attribute value of unsupported type %T in test log", v))
|
||||
}
|
||||
|
@ -43,7 +43,7 @@ func InitDB(qsDB *sqlx.DB) (*sqlx.DB, error) {
|
||||
|
||||
_, err := db.Exec(tableSchema)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Error in creating agents table: %s", err.Error())
|
||||
return nil, fmt.Errorf("error in creating agents table: %s", err.Error())
|
||||
}
|
||||
|
||||
AllAgents = Agents{
|
||||
|
@ -14,10 +14,8 @@ import (
|
||||
var opAmpServer *Server
|
||||
|
||||
type Server struct {
|
||||
server server.OpAMPServer
|
||||
agents *model.Agents
|
||||
logger *zap.Logger
|
||||
capabilities int32
|
||||
server server.OpAMPServer
|
||||
agents *model.Agents
|
||||
|
||||
agentConfigProvider AgentConfigProvider
|
||||
|
||||
@ -98,8 +96,7 @@ func (srv *Server) OnMessage(conn types.Connection, msg *protobufs.AgentToServer
|
||||
)
|
||||
}
|
||||
|
||||
var response *protobufs.ServerToAgent
|
||||
response = &protobufs.ServerToAgent{
|
||||
response := &protobufs.ServerToAgent{
|
||||
InstanceUid: agentID,
|
||||
Capabilities: uint64(capabilities),
|
||||
}
|
||||
|
@ -21,8 +21,8 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/app/metrics"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/queryBuilder"
|
||||
"go.signoz.io/signoz/pkg/query-service/auth"
|
||||
baseconstants "go.signoz.io/signoz/pkg/query-service/constants"
|
||||
"go.signoz.io/signoz/pkg/query-service/common"
|
||||
baseconstants "go.signoz.io/signoz/pkg/query-service/constants"
|
||||
"go.signoz.io/signoz/pkg/query-service/model"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
"go.signoz.io/signoz/pkg/query-service/postprocess"
|
||||
@ -32,19 +32,6 @@ import (
|
||||
|
||||
var allowedFunctions = []string{"count", "ratePerSec", "sum", "avg", "min", "max", "p50", "p90", "p95", "p99"}
|
||||
|
||||
func parseUser(r *http.Request) (*model.User, error) {
|
||||
|
||||
var user model.User
|
||||
if err := json.NewDecoder(r.Body).Decode(&user); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(user.Email) == 0 {
|
||||
return nil, fmt.Errorf("email field not found")
|
||||
}
|
||||
|
||||
return &user, nil
|
||||
}
|
||||
|
||||
func parseGetTopOperationsRequest(r *http.Request) (*model.GetTopOperationsParams, error) {
|
||||
var postData *model.GetTopOperationsParams
|
||||
err := json.NewDecoder(r.Body).Decode(&postData)
|
||||
@ -394,7 +381,7 @@ func parseFilteredSpanAggregatesRequest(r *http.Request) (*model.GetFilteredSpan
|
||||
return nil, errors.New("function param missing in query")
|
||||
} else {
|
||||
if !DoesExistInSlice(function, allowedFunctions) {
|
||||
return nil, errors.New(fmt.Sprintf("given function: %s is not allowed in query", function))
|
||||
return nil, fmt.Errorf("given function: %s is not allowed in query", function)
|
||||
}
|
||||
}
|
||||
|
||||
@ -549,11 +536,11 @@ func parseListErrorsRequest(r *http.Request) (*model.ListErrorsParams, error) {
|
||||
}
|
||||
|
||||
if len(postData.Order) > 0 && !DoesExistInSlice(postData.Order, allowedOrderDirections) {
|
||||
return nil, errors.New(fmt.Sprintf("given order: %s is not allowed in query", postData.Order))
|
||||
return nil, fmt.Errorf("given order: %s is not allowed in query", postData.Order)
|
||||
}
|
||||
|
||||
if len(postData.Order) > 0 && !DoesExistInSlice(postData.OrderParam, allowedOrderParams) {
|
||||
return nil, errors.New(fmt.Sprintf("given orderParam: %s is not allowed in query", postData.OrderParam))
|
||||
return nil, fmt.Errorf("given orderParam: %s is not allowed in query", postData.OrderParam)
|
||||
}
|
||||
|
||||
return postData, nil
|
||||
@ -659,29 +646,6 @@ func parseTime(param string, r *http.Request) (*time.Time, error) {
|
||||
|
||||
}
|
||||
|
||||
func parseTimeMinusBuffer(param string, r *http.Request) (*time.Time, error) {
|
||||
|
||||
timeStr := r.URL.Query().Get(param)
|
||||
if len(timeStr) == 0 {
|
||||
return nil, fmt.Errorf("%s param missing in query", param)
|
||||
}
|
||||
|
||||
timeUnix, err := strconv.ParseInt(timeStr, 10, 64)
|
||||
if err != nil || len(timeStr) == 0 {
|
||||
return nil, fmt.Errorf("%s param is not in correct timestamp format", param)
|
||||
}
|
||||
|
||||
timeUnixNow := time.Now().UnixNano()
|
||||
if timeUnix > timeUnixNow-30000000000 {
|
||||
timeUnix = timeUnix - 30000000000
|
||||
}
|
||||
|
||||
timeFmt := time.Unix(0, timeUnix)
|
||||
|
||||
return &timeFmt, nil
|
||||
|
||||
}
|
||||
|
||||
func parseTTLParams(r *http.Request) (*model.TTLParams, error) {
|
||||
|
||||
// make sure either of the query params are present
|
||||
@ -702,7 +666,7 @@ func parseTTLParams(r *http.Request) (*model.TTLParams, error) {
|
||||
// Validate the TTL duration.
|
||||
durationParsed, err := time.ParseDuration(delDuration)
|
||||
if err != nil || durationParsed.Seconds() <= 0 {
|
||||
return nil, fmt.Errorf("Not a valid TTL duration %v", delDuration)
|
||||
return nil, fmt.Errorf("not a valid TTL duration %v", delDuration)
|
||||
}
|
||||
|
||||
var toColdParsed time.Duration
|
||||
@ -711,10 +675,10 @@ func parseTTLParams(r *http.Request) (*model.TTLParams, error) {
|
||||
if len(coldStorage) > 0 {
|
||||
toColdParsed, err = time.ParseDuration(toColdDuration)
|
||||
if err != nil || toColdParsed.Seconds() <= 0 {
|
||||
return nil, fmt.Errorf("Not a valid toCold TTL duration %v", toColdDuration)
|
||||
return nil, fmt.Errorf("not a valid toCold TTL duration %v", toColdDuration)
|
||||
}
|
||||
if toColdParsed.Seconds() != 0 && toColdParsed.Seconds() >= durationParsed.Seconds() {
|
||||
return nil, fmt.Errorf("Delete TTL should be greater than cold storage move TTL.")
|
||||
return nil, fmt.Errorf("delete TTL should be greater than cold storage move TTL")
|
||||
}
|
||||
}
|
||||
|
||||
@ -842,15 +806,6 @@ func parseChangePasswordRequest(r *http.Request) (*model.ChangePasswordRequest,
|
||||
return &req, nil
|
||||
}
|
||||
|
||||
func parseFilterSet(r *http.Request) (*model.FilterSet, error) {
|
||||
var filterSet model.FilterSet
|
||||
err := json.NewDecoder(r.Body).Decode(&filterSet)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &filterSet, nil
|
||||
}
|
||||
|
||||
func parseAggregateAttributeRequest(r *http.Request) (*v3.AggregateAttributeRequest, error) {
|
||||
var req v3.AggregateAttributeRequest
|
||||
|
||||
|
@ -284,7 +284,7 @@ func mergeSerieses(cachedSeries, missedSeries []*v3.Series) []*v3.Series {
|
||||
return mergedSeries
|
||||
}
|
||||
|
||||
func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, error, map[string]error) {
|
||||
func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, map[string]error, error) {
|
||||
|
||||
cacheKeys := q.keyGenerator.GenerateKeys(params)
|
||||
|
||||
@ -327,10 +327,10 @@ func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangePa
|
||||
err = fmt.Errorf("error in builder queries")
|
||||
}
|
||||
|
||||
return results, err, errQueriesByName
|
||||
return results, errQueriesByName, err
|
||||
}
|
||||
|
||||
func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, error, map[string]error) {
|
||||
func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) {
|
||||
channelResults := make(chan channelResult, len(params.CompositeQuery.PromQueries))
|
||||
var wg sync.WaitGroup
|
||||
cacheKeys := q.keyGenerator.GenerateKeys(params)
|
||||
@ -411,10 +411,10 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam
|
||||
err = fmt.Errorf("error in prom queries")
|
||||
}
|
||||
|
||||
return results, err, errQueriesByName
|
||||
return results, errQueriesByName, err
|
||||
}
|
||||
|
||||
func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, error, map[string]error) {
|
||||
func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) {
|
||||
channelResults := make(chan channelResult, len(params.CompositeQuery.ClickHouseQueries))
|
||||
var wg sync.WaitGroup
|
||||
for queryName, clickHouseQuery := range params.CompositeQuery.ClickHouseQueries {
|
||||
@ -451,15 +451,15 @@ func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRang
|
||||
if len(errs) > 0 {
|
||||
err = fmt.Errorf("error in clickhouse queries")
|
||||
}
|
||||
return results, err, errQueriesByName
|
||||
return results, errQueriesByName, err
|
||||
}
|
||||
|
||||
func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, error, map[string]error) {
|
||||
func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, map[string]error, error) {
|
||||
|
||||
queries, err := q.builder.PrepareQueries(params, keys)
|
||||
|
||||
if err != nil {
|
||||
return nil, err, nil
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
ch := make(chan channelResult, len(queries))
|
||||
@ -498,12 +498,12 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan
|
||||
})
|
||||
}
|
||||
if len(errs) != 0 {
|
||||
return nil, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)), errQuriesByName
|
||||
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
|
||||
}
|
||||
return res, nil, nil
|
||||
}
|
||||
|
||||
func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, error, map[string]error) {
|
||||
func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, map[string]error, error) {
|
||||
var results []*v3.Result
|
||||
var err error
|
||||
var errQueriesByName map[string]error
|
||||
@ -511,9 +511,9 @@ func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3,
|
||||
switch params.CompositeQuery.QueryType {
|
||||
case v3.QueryTypeBuilder:
|
||||
if params.CompositeQuery.PanelType == v3.PanelTypeList || params.CompositeQuery.PanelType == v3.PanelTypeTrace {
|
||||
results, err, errQueriesByName = q.runBuilderListQueries(ctx, params, keys)
|
||||
results, errQueriesByName, err = q.runBuilderListQueries(ctx, params, keys)
|
||||
} else {
|
||||
results, err, errQueriesByName = q.runBuilderQueries(ctx, params, keys)
|
||||
results, errQueriesByName, err = q.runBuilderQueries(ctx, params, keys)
|
||||
}
|
||||
// in builder query, the only errors we expose are the ones that exceed the resource limits
|
||||
// everything else is internal error as they are not actionable by the user
|
||||
@ -523,9 +523,9 @@ func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3,
|
||||
}
|
||||
}
|
||||
case v3.QueryTypePromQL:
|
||||
results, err, errQueriesByName = q.runPromQueries(ctx, params)
|
||||
results, errQueriesByName, err = q.runPromQueries(ctx, params)
|
||||
case v3.QueryTypeClickHouseSQL:
|
||||
results, err, errQueriesByName = q.runClickHouseQueries(ctx, params)
|
||||
results, errQueriesByName, err = q.runClickHouseQueries(ctx, params)
|
||||
default:
|
||||
err = fmt.Errorf("invalid query type")
|
||||
}
|
||||
@ -540,7 +540,7 @@ func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3,
|
||||
}
|
||||
}
|
||||
|
||||
return results, err, errQueriesByName
|
||||
return results, errQueriesByName, err
|
||||
}
|
||||
|
||||
func (q *querier) QueriesExecuted() []string {
|
||||
|
@ -579,7 +579,7 @@ func TestQueryRange(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, param := range params {
|
||||
_, err, errByName := q.QueryRange(context.Background(), param, nil)
|
||||
_, errByName, err := q.QueryRange(context.Background(), param, nil)
|
||||
if err != nil {
|
||||
t.Errorf("expected no error, got %s", err)
|
||||
}
|
||||
@ -688,7 +688,7 @@ func TestQueryRangeValueType(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, param := range params {
|
||||
_, err, errByName := q.QueryRange(context.Background(), param, nil)
|
||||
_, errByName, err := q.QueryRange(context.Background(), param, nil)
|
||||
if err != nil {
|
||||
t.Errorf("expected no error, got %s", err)
|
||||
}
|
||||
@ -741,7 +741,7 @@ func TestQueryRangeTimeShift(t *testing.T) {
|
||||
expectedTimeRangeInQueryString := fmt.Sprintf("timestamp >= %d AND timestamp <= %d", (1675115596722-86400*1000)*1000000, ((1675115596722+120*60*1000)-86400*1000)*1000000)
|
||||
|
||||
for i, param := range params {
|
||||
_, err, errByName := q.QueryRange(context.Background(), param, nil)
|
||||
_, errByName, err := q.QueryRange(context.Background(), param, nil)
|
||||
if err != nil {
|
||||
t.Errorf("expected no error, got %s", err)
|
||||
}
|
||||
@ -839,7 +839,7 @@ func TestQueryRangeTimeShiftWithCache(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, param := range params {
|
||||
_, err, errByName := q.QueryRange(context.Background(), param, nil)
|
||||
_, errByName, err := q.QueryRange(context.Background(), param, nil)
|
||||
if err != nil {
|
||||
t.Errorf("expected no error, got %s", err)
|
||||
}
|
||||
@ -939,7 +939,7 @@ func TestQueryRangeTimeShiftWithLimitAndCache(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, param := range params {
|
||||
_, err, errByName := q.QueryRange(context.Background(), param, nil)
|
||||
_, errByName, err := q.QueryRange(context.Background(), param, nil)
|
||||
if err != nil {
|
||||
t.Errorf("expected no error, got %s", err)
|
||||
}
|
||||
|
@ -322,93 +322,3 @@ func (q *querier) runBuilderQuery(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (q *querier) runBuilderExpression(
|
||||
ctx context.Context,
|
||||
builderQuery *v3.BuilderQuery,
|
||||
params *v3.QueryRangeParamsV3,
|
||||
keys map[string]v3.AttributeKey,
|
||||
cacheKeys map[string]string,
|
||||
ch chan channelResult,
|
||||
wg *sync.WaitGroup,
|
||||
) {
|
||||
defer wg.Done()
|
||||
|
||||
queryName := builderQuery.QueryName
|
||||
|
||||
queries, err := q.builder.PrepareQueries(params, keys)
|
||||
if err != nil {
|
||||
ch <- channelResult{Err: err, Name: queryName, Query: "", Series: nil}
|
||||
return
|
||||
}
|
||||
|
||||
if _, ok := cacheKeys[queryName]; !ok {
|
||||
query := queries[queryName]
|
||||
series, err := q.execClickHouseQuery(ctx, query)
|
||||
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: series}
|
||||
return
|
||||
}
|
||||
|
||||
cacheKey := cacheKeys[queryName]
|
||||
var cachedData []byte
|
||||
if !params.NoCache && q.cache != nil {
|
||||
var retrieveStatus status.RetrieveStatus
|
||||
data, retrieveStatus, err := q.cache.Retrieve(cacheKey, true)
|
||||
zap.L().Info("cache retrieve status", zap.String("status", retrieveStatus.String()))
|
||||
if err == nil {
|
||||
cachedData = data
|
||||
}
|
||||
}
|
||||
misses := q.findMissingTimeRanges(params.Start, params.End, params.Step, cachedData)
|
||||
missedSeries := make([]*v3.Series, 0)
|
||||
cachedSeries := make([]*v3.Series, 0)
|
||||
for _, miss := range misses {
|
||||
missQueries, _ := q.builder.PrepareQueries(&v3.QueryRangeParamsV3{
|
||||
Start: miss.start,
|
||||
End: miss.end,
|
||||
Step: params.Step,
|
||||
NoCache: params.NoCache,
|
||||
CompositeQuery: params.CompositeQuery,
|
||||
Variables: params.Variables,
|
||||
}, keys)
|
||||
query := missQueries[queryName]
|
||||
series, err := q.execClickHouseQuery(ctx, query)
|
||||
if err != nil {
|
||||
ch <- channelResult{Err: err, Name: queryName, Query: query, Series: nil}
|
||||
return
|
||||
}
|
||||
missedSeries = append(missedSeries, series...)
|
||||
}
|
||||
if err := json.Unmarshal(cachedData, &cachedSeries); err != nil && cachedData != nil {
|
||||
zap.L().Error("error unmarshalling cached data", zap.Error(err))
|
||||
}
|
||||
mergedSeries := mergeSerieses(cachedSeries, missedSeries)
|
||||
|
||||
var mergedSeriesData []byte
|
||||
missedSeriesLen := len(missedSeries)
|
||||
var marshallingErr error
|
||||
if missedSeriesLen > 0 && !params.NoCache && q.cache != nil {
|
||||
// caching the data
|
||||
mergedSeriesData, marshallingErr = json.Marshal(mergedSeries)
|
||||
if marshallingErr != nil {
|
||||
zap.S().Error("error marshalling merged series", zap.Error(marshallingErr))
|
||||
}
|
||||
}
|
||||
|
||||
// response doesn't need everything
|
||||
filterCachedPoints(mergedSeries, params.Start, params.End)
|
||||
|
||||
ch <- channelResult{
|
||||
Err: nil,
|
||||
Name: queryName,
|
||||
Series: mergedSeries,
|
||||
}
|
||||
// Cache the seriesList for future queries
|
||||
if len(missedSeries) > 0 && !params.NoCache && q.cache != nil && marshallingErr == nil {
|
||||
err = q.cache.Store(cacheKey, mergedSeriesData, time.Hour)
|
||||
if err != nil {
|
||||
zap.L().Error("error storing merged series", zap.Error(err))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -282,7 +282,7 @@ func mergeSerieses(cachedSeries, missedSeries []*v3.Series) []*v3.Series {
|
||||
return mergedSeries
|
||||
}
|
||||
|
||||
func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, error, map[string]error) {
|
||||
func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, map[string]error, error) {
|
||||
|
||||
cacheKeys := q.keyGenerator.GenerateKeys(params)
|
||||
|
||||
@ -320,10 +320,10 @@ func (q *querier) runBuilderQueries(ctx context.Context, params *v3.QueryRangePa
|
||||
err = fmt.Errorf("error in builder queries")
|
||||
}
|
||||
|
||||
return results, err, errQueriesByName
|
||||
return results, errQueriesByName, err
|
||||
}
|
||||
|
||||
func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, error, map[string]error) {
|
||||
func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) {
|
||||
channelResults := make(chan channelResult, len(params.CompositeQuery.PromQueries))
|
||||
var wg sync.WaitGroup
|
||||
cacheKeys := q.keyGenerator.GenerateKeys(params)
|
||||
@ -404,10 +404,10 @@ func (q *querier) runPromQueries(ctx context.Context, params *v3.QueryRangeParam
|
||||
err = fmt.Errorf("error in prom queries")
|
||||
}
|
||||
|
||||
return results, err, errQueriesByName
|
||||
return results, errQueriesByName, err
|
||||
}
|
||||
|
||||
func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, error, map[string]error) {
|
||||
func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) {
|
||||
channelResults := make(chan channelResult, len(params.CompositeQuery.ClickHouseQueries))
|
||||
var wg sync.WaitGroup
|
||||
for queryName, clickHouseQuery := range params.CompositeQuery.ClickHouseQueries {
|
||||
@ -444,15 +444,15 @@ func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRang
|
||||
if len(errs) > 0 {
|
||||
err = fmt.Errorf("error in clickhouse queries")
|
||||
}
|
||||
return results, err, errQueriesByName
|
||||
return results, errQueriesByName, err
|
||||
}
|
||||
|
||||
func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, error, map[string]error) {
|
||||
func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, map[string]error, error) {
|
||||
|
||||
queries, err := q.builder.PrepareQueries(params, keys)
|
||||
|
||||
if err != nil {
|
||||
return nil, err, nil
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
ch := make(chan channelResult, len(queries))
|
||||
@ -491,12 +491,12 @@ func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRan
|
||||
})
|
||||
}
|
||||
if len(errs) != 0 {
|
||||
return nil, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...)), errQuriesByName
|
||||
return nil, errQuriesByName, fmt.Errorf("encountered multiple errors: %s", multierr.Combine(errs...))
|
||||
}
|
||||
return res, nil, nil
|
||||
}
|
||||
|
||||
func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, error, map[string]error) {
|
||||
func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) ([]*v3.Result, map[string]error, error) {
|
||||
var results []*v3.Result
|
||||
var err error
|
||||
var errQueriesByName map[string]error
|
||||
@ -504,9 +504,9 @@ func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3,
|
||||
switch params.CompositeQuery.QueryType {
|
||||
case v3.QueryTypeBuilder:
|
||||
if params.CompositeQuery.PanelType == v3.PanelTypeList || params.CompositeQuery.PanelType == v3.PanelTypeTrace {
|
||||
results, err, errQueriesByName = q.runBuilderListQueries(ctx, params, keys)
|
||||
results, errQueriesByName, err = q.runBuilderListQueries(ctx, params, keys)
|
||||
} else {
|
||||
results, err, errQueriesByName = q.runBuilderQueries(ctx, params, keys)
|
||||
results, errQueriesByName, err = q.runBuilderQueries(ctx, params, keys)
|
||||
}
|
||||
// in builder query, the only errors we expose are the ones that exceed the resource limits
|
||||
// everything else is internal error as they are not actionable by the user
|
||||
@ -516,9 +516,9 @@ func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3,
|
||||
}
|
||||
}
|
||||
case v3.QueryTypePromQL:
|
||||
results, err, errQueriesByName = q.runPromQueries(ctx, params)
|
||||
results, errQueriesByName, err = q.runPromQueries(ctx, params)
|
||||
case v3.QueryTypeClickHouseSQL:
|
||||
results, err, errQueriesByName = q.runClickHouseQueries(ctx, params)
|
||||
results, errQueriesByName, err = q.runClickHouseQueries(ctx, params)
|
||||
default:
|
||||
err = fmt.Errorf("invalid query type")
|
||||
}
|
||||
@ -533,7 +533,7 @@ func (q *querier) QueryRange(ctx context.Context, params *v3.QueryRangeParamsV3,
|
||||
}
|
||||
}
|
||||
|
||||
return results, err, errQueriesByName
|
||||
return results, errQueriesByName, err
|
||||
}
|
||||
|
||||
func (q *querier) QueriesExecuted() []string {
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
|
||||
"go.signoz.io/signoz/pkg/query-service/app/opamp"
|
||||
opAmpModel "go.signoz.io/signoz/pkg/query-service/app/opamp/model"
|
||||
"go.signoz.io/signoz/pkg/query-service/common"
|
||||
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/app/explorer"
|
||||
@ -66,12 +67,8 @@ type ServerOptions struct {
|
||||
|
||||
// Server runs HTTP, Mux and a grpc server
|
||||
type Server struct {
|
||||
// logger *zap.Logger
|
||||
// tracer opentracing.Tracer // TODO make part of flags.Service
|
||||
serverOptions *ServerOptions
|
||||
conn net.Listener
|
||||
ruleManager *rules.Manager
|
||||
separatePorts bool
|
||||
|
||||
// public http router
|
||||
httpConn net.Listener
|
||||
@ -128,7 +125,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
|
||||
go clickhouseReader.Start(readerReady)
|
||||
reader = clickhouseReader
|
||||
} else {
|
||||
return nil, fmt.Errorf("Storage type: %s is not supported in query service", storage)
|
||||
return nil, fmt.Errorf("storage type: %s is not supported in query service", storage)
|
||||
}
|
||||
skipConfig := &model.SkipConfig{}
|
||||
if serverOptions.SkipTopLvlOpsPath != "" {
|
||||
@ -303,7 +300,7 @@ func loggingMiddleware(next http.Handler) http.Handler {
|
||||
path, _ := route.GetPathTemplate()
|
||||
startTime := time.Now()
|
||||
next.ServeHTTP(w, r)
|
||||
zap.L().Info(path+"\ttimeTaken:"+time.Now().Sub(startTime).String(), zap.Duration("timeTaken", time.Now().Sub(startTime)), zap.String("path", path))
|
||||
zap.L().Info(path, zap.Duration("timeTaken", time.Since(startTime)), zap.String("path", path))
|
||||
})
|
||||
}
|
||||
|
||||
@ -361,7 +358,7 @@ func LogCommentEnricher(next http.Handler) http.Handler {
|
||||
"servicesTab": tab,
|
||||
}
|
||||
|
||||
r = r.WithContext(context.WithValue(r.Context(), "log_comment", kvs))
|
||||
r = r.WithContext(context.WithValue(r.Context(), common.LogCommentKey, kvs))
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
@ -374,7 +371,7 @@ func loggingMiddlewarePrivate(next http.Handler) http.Handler {
|
||||
path, _ := route.GetPathTemplate()
|
||||
startTime := time.Now()
|
||||
next.ServeHTTP(w, r)
|
||||
zap.L().Info(path+"\tprivatePort: true \ttimeTaken"+time.Now().Sub(startTime).String(), zap.Duration("timeTaken", time.Now().Sub(startTime)), zap.String("path", path), zap.Bool("tprivatePort", true))
|
||||
zap.L().Info(path, zap.Duration("timeTaken", time.Since(startTime)), zap.String("path", path), zap.Bool("privatePort", true))
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -22,13 +22,18 @@ import (
|
||||
"golang.org/x/crypto/bcrypt"
|
||||
)
|
||||
|
||||
type JwtContextKeyType string
|
||||
|
||||
const AccessJwtKey JwtContextKeyType = "accessJwt"
|
||||
const RefreshJwtKey JwtContextKeyType = "refreshJwt"
|
||||
|
||||
const (
|
||||
opaqueTokenSize = 16
|
||||
minimumPasswordLength = 8
|
||||
)
|
||||
|
||||
var (
|
||||
ErrorInvalidCreds = fmt.Errorf("Invalid credentials")
|
||||
ErrorInvalidCreds = fmt.Errorf("invalid credentials")
|
||||
)
|
||||
|
||||
type InviteEmailData struct {
|
||||
@ -129,7 +134,6 @@ func inviteEmail(req *model.InviteRequest, au *model.UserPayload, token string)
|
||||
zap.L().Error("failed to send email", zap.Error(err))
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// RevokeInvite is used to revoke the invitation for the given email.
|
||||
@ -488,10 +492,7 @@ func PasswordHash(pass string) (string, error) {
|
||||
// Checks if the given password results in the given hash.
|
||||
func passwordMatch(hash, password string) bool {
|
||||
err := bcrypt.CompareHashAndPassword([]byte(hash), []byte(password))
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func GenerateJWTForUser(user *model.User) (model.UserJwtObject, error) {
|
||||
|
@ -64,11 +64,11 @@ func AttachJwtToContext(ctx context.Context, r *http.Request) context.Context {
|
||||
return ctx
|
||||
}
|
||||
|
||||
return context.WithValue(ctx, "accessJwt", token)
|
||||
return context.WithValue(ctx, AccessJwtKey, token)
|
||||
}
|
||||
|
||||
func ExtractJwtFromContext(ctx context.Context) (string, bool) {
|
||||
jwtToken, ok := ctx.Value("accessJwt").(string)
|
||||
jwtToken, ok := ctx.Value(AccessJwtKey).(string)
|
||||
return jwtToken, ok
|
||||
}
|
||||
|
||||
|
@ -19,8 +19,6 @@ func isValidRole(role string) bool {
|
||||
switch role {
|
||||
case constants.AdminGroup, constants.EditorGroup, constants.ViewerGroup:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
5
pkg/query-service/common/ctx.go
Normal file
5
pkg/query-service/common/ctx.go
Normal file
@ -0,0 +1,5 @@
|
||||
package common
|
||||
|
||||
type LogCommentContextKeyType string
|
||||
|
||||
const LogCommentKey LogCommentContextKeyType = "logComment"
|
@ -90,7 +90,7 @@ func InitDB(dataSourceName string) (*ModelDaoSqlite, error) {
|
||||
|
||||
_, err = db.Exec(table_schema)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Error in creating tables: %v", err.Error())
|
||||
return nil, fmt.Errorf("error in creating tables: %v", err.Error())
|
||||
}
|
||||
|
||||
mds := &ModelDaoSqlite{db: db}
|
||||
|
@ -14,7 +14,7 @@ import (
|
||||
|
||||
old_ctx "golang.org/x/net/context"
|
||||
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/go-kit/log"
|
||||
"github.com/go-kit/log/level"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
@ -107,7 +107,7 @@ type Reader interface {
|
||||
}
|
||||
|
||||
type Querier interface {
|
||||
QueryRange(context.Context, *v3.QueryRangeParamsV3, map[string]v3.AttributeKey) ([]*v3.Result, error, map[string]error)
|
||||
QueryRange(context.Context, *v3.QueryRangeParamsV3, map[string]v3.AttributeKey) ([]*v3.Result, map[string]error, error)
|
||||
|
||||
// test helpers
|
||||
QueriesExecuted() []string
|
||||
|
@ -8,7 +8,6 @@ import (
|
||||
"github.com/go-kit/log"
|
||||
pmodel "github.com/prometheus/common/model"
|
||||
"github.com/prometheus/common/promlog"
|
||||
plog "github.com/prometheus/common/promlog"
|
||||
pconfig "github.com/prometheus/prometheus/config"
|
||||
pql "github.com/prometheus/prometheus/promql"
|
||||
pstorage "github.com/prometheus/prometheus/storage"
|
||||
@ -40,7 +39,7 @@ func FromReader(ch interfaces.Reader) (*PqlEngine, error) {
|
||||
|
||||
func NewPqlEngine(config *pconfig.Config) (*PqlEngine, error) {
|
||||
|
||||
logLevel := plog.AllowedLevel{}
|
||||
logLevel := promlog.AllowedLevel{}
|
||||
logLevel.Set("debug")
|
||||
|
||||
allowedFormat := promlog.AllowedFormat{}
|
||||
@ -51,7 +50,7 @@ func NewPqlEngine(config *pconfig.Config) (*PqlEngine, error) {
|
||||
Format: &allowedFormat,
|
||||
}
|
||||
|
||||
logger := plog.New(&promlogConfig)
|
||||
logger := promlog.New(&promlogConfig)
|
||||
|
||||
opts := pql.EngineOpts{
|
||||
Logger: log.With(logger, "component", "promql evaluator"),
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"github.com/go-kit/log"
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
plabels "github.com/prometheus/prometheus/model/labels"
|
||||
"go.signoz.io/signoz/pkg/query-service/common"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@ -364,7 +365,7 @@ func (g *PromRuleTask) Eval(ctx context.Context, ts time.Time) {
|
||||
"source": "alerts",
|
||||
"client": "query-service",
|
||||
}
|
||||
ctx = context.WithValue(ctx, "log_comment", kvs)
|
||||
ctx = context.WithValue(ctx, common.LogCommentKey, kvs)
|
||||
|
||||
_, err := rule.Eval(ctx, ts, g.opts.Queriers)
|
||||
if err != nil {
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
"go.signoz.io/signoz/pkg/query-service/common"
|
||||
"go.signoz.io/signoz/pkg/query-service/utils/labels"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@ -345,7 +346,7 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
|
||||
"source": "alerts",
|
||||
"client": "query-service",
|
||||
}
|
||||
ctx = context.WithValue(ctx, "log_comment", kvs)
|
||||
ctx = context.WithValue(ctx, common.LogCommentKey, kvs)
|
||||
|
||||
_, err := rule.Eval(ctx, ts, g.opts.Queriers)
|
||||
if err != nil {
|
||||
|
@ -14,6 +14,8 @@ import (
|
||||
html_template "html/template"
|
||||
text_template "text/template"
|
||||
|
||||
"golang.org/x/text/cases"
|
||||
|
||||
"go.signoz.io/signoz/pkg/query-service/utils/times"
|
||||
)
|
||||
|
||||
@ -96,7 +98,7 @@ func NewTemplateExpander(
|
||||
return html_template.HTML(text)
|
||||
},
|
||||
"match": regexp.MatchString,
|
||||
"title": strings.Title,
|
||||
"title": cases.Title,
|
||||
"toUpper": strings.ToUpper,
|
||||
"toLower": strings.ToLower,
|
||||
"sortByLabel": func(label string, v tmplQueryResults) tmplQueryResults {
|
||||
|
@ -767,9 +767,9 @@ func (r *ThresholdRule) buildAndRunQuery(ctx context.Context, ts time.Time, ch c
|
||||
var errQuriesByName map[string]error
|
||||
|
||||
if r.version == "v4" {
|
||||
results, err, errQuriesByName = r.querierV2.QueryRange(ctx, params, map[string]v3.AttributeKey{})
|
||||
results, errQuriesByName, err = r.querierV2.QueryRange(ctx, params, map[string]v3.AttributeKey{})
|
||||
} else {
|
||||
results, err, errQuriesByName = r.querier.QueryRange(ctx, params, map[string]v3.AttributeKey{})
|
||||
results, errQuriesByName, err = r.querier.QueryRange(ctx, params, map[string]v3.AttributeKey{})
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
|
@ -797,7 +797,7 @@ func TestThresholdRuleLabelNormalization(t *testing.T) {
|
||||
"service.name": "frontend",
|
||||
},
|
||||
LabelsArray: []map[string]string{
|
||||
map[string]string{
|
||||
{
|
||||
"service.name": "frontend",
|
||||
},
|
||||
},
|
||||
|
@ -197,8 +197,6 @@ func createTelemetry() {
|
||||
telemetry.minRandInt = 0
|
||||
telemetry.maxRandInt = int(1 / DEFAULT_SAMPLING)
|
||||
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
|
||||
telemetry.SetTelemetryEnabled(constants.IsTelemetryEnabled())
|
||||
|
||||
ticker := time.NewTicker(HEART_BEAT_DURATION)
|
||||
@ -207,6 +205,7 @@ func createTelemetry() {
|
||||
rateLimitTicker := time.NewTicker(RATE_LIMIT_CHECK_DURATION)
|
||||
|
||||
go func() {
|
||||
//lint:ignore S1000 false positive
|
||||
for {
|
||||
select {
|
||||
case <-rateLimitTicker.C:
|
||||
|
@ -31,7 +31,7 @@ func invite(t *testing.T, email string) *model.InviteResponse {
|
||||
}
|
||||
|
||||
func register(email, password, token string) (string, error) {
|
||||
q := endpoint + fmt.Sprintf("/api/v1/register")
|
||||
q := endpoint + "/api/v1/register"
|
||||
|
||||
req := auth.RegisterRequest{
|
||||
Email: email,
|
||||
@ -58,7 +58,7 @@ func register(email, password, token string) (string, error) {
|
||||
}
|
||||
|
||||
func login(email, password, refreshToken string) (*model.LoginResponse, error) {
|
||||
q := endpoint + fmt.Sprintf("/api/v1/login")
|
||||
q := endpoint + "/api/v1/login"
|
||||
|
||||
req := model.LoginRequest{
|
||||
Email: email,
|
||||
|
@ -645,6 +645,7 @@ func assertPipelinesRecommendedInRemoteConfig(
|
||||
}
|
||||
|
||||
_, expectedLogProcessorNames, err := logparsingpipeline.PreparePipelineProcessor(pipelines)
|
||||
require.NoError(t, err)
|
||||
require.Equal(
|
||||
t, expectedLogProcessorNames, collectorConfLogsPipelineProcNames,
|
||||
"config sent to opamp client doesn't contain expected log pipelines",
|
||||
|
@ -123,15 +123,15 @@ func makeTestSignozLog(
|
||||
}
|
||||
|
||||
for k, v := range attributes {
|
||||
switch v.(type) {
|
||||
switch v := v.(type) {
|
||||
case bool:
|
||||
testLog.Attributes_bool[k] = v.(bool)
|
||||
testLog.Attributes_bool[k] = v
|
||||
case string:
|
||||
testLog.Attributes_string[k] = v.(string)
|
||||
testLog.Attributes_string[k] = v
|
||||
case int:
|
||||
testLog.Attributes_int64[k] = int64(v.(int))
|
||||
testLog.Attributes_int64[k] = int64(v)
|
||||
case float64:
|
||||
testLog.Attributes_float64[k] = v.(float64)
|
||||
testLog.Attributes_float64[k] = v
|
||||
default:
|
||||
panic(fmt.Sprintf("found attribute value of unsupported type %T in test log", v))
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user