mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-07-28 00:31:58 +08:00

* feat(organization): add hname and alias for organization * fix: boolean values are not shown in the list panel's column * fix: moved logic to component level * fix: added type * fix: added test cases * fix: added test cases * chore: update copy webpack plugin * Revert "fix: display same key with multiple data types in filter suggestions by enhancing the deduping logic (#7255)" This reverts commit 1e85981a17a8e715e948308d3e85072d976907d3. * fix: use query search v2 for traces data source to handle multiple data types for the same key * fix(QueryBuilderSearchV2): add user typed option if it doesn't exist in the payload * fix(QueryBuilderSearchV2): increase the height of search dropdown for non-logs data sources * fix: display span scope selector for trace data source * chore: remove the span scope selector from qb search v1 and move the component to search v2 * fix: write test to ensure that we display span scope selector for traces data source * fix: limit converting -> only to log data source * fix: don't display empty suggestion if only spaces are typed * chore: tests for span scope selector * chore: qb search flow (key, operator, value) test cases * refactor: fix the Maximum update depth reached issue while running tests * chore: overall improvements to span scope selector tests Resource attr filter: style fix and quick filter changes (#7691) * chore: resource attr filter init * chore: resource attr filter api integration * chore: operator config updated * chore: fliter show hide logic and styles * chore: add support for custom operator list to qb * chore: minor refactor * chore: minor code refactor * test: quick filters test suite added * test: quick filters test suite added * test: all errors test suite added * chore: style fix * test: all errors mock fix * chore: test case fix and mixpanel update * chore: color update * chore: minor refactor * chore: style fix * chore: set default query in exceptions tab * chore: style fix * chore: minor refactor * chore: minor refactor * chore: minor refactor * chore: test update * chore: fix filter header with no query name * fix: scroll fix * chore: add data source traces to quick filters * chore: replace div with fragment --------- Co-authored-by: Aditya Singh <adityasingh@Adityas-MacBook-Pro.local> fix: handle rate operators for table panel (#7695) * fix: handle rate operators for table panel chore: fix error rate (#7701) Signed-off-by: Shivanshu Raj Shrivastava <shivanshu1333@gmail.com> * feat(organization): minor cleanups * feat(organization): better naming for api and usecase * feat(organization): better packaging for modules * feat(organization): change hname to displayName * feat(organization): update the migration to use dialect * feat(organization): update the migration to use dialect * feat(organization): update the migration to use dialect * feat(organization): revert back to impl * feat(organization): remove DI from organization * feat(organization): address review comments * feat(organization): address review comments * feat(organization): address review comments --------- Signed-off-by: Shivanshu Raj Shrivastava <shivanshu1333@gmail.com>
201 lines
5.9 KiB
Go
201 lines
5.9 KiB
Go
package logparsingpipeline
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/SigNoz/signoz/pkg/query-service/model"
|
|
"github.com/SigNoz/signoz/pkg/sqlstore"
|
|
"github.com/SigNoz/signoz/pkg/types"
|
|
"github.com/SigNoz/signoz/pkg/types/authtypes"
|
|
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
|
|
"github.com/SigNoz/signoz/pkg/valuer"
|
|
"github.com/pkg/errors"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// Repo handles DDL and DML ops on ingestion pipeline
|
|
type Repo struct {
|
|
sqlStore sqlstore.SQLStore
|
|
}
|
|
|
|
const logPipelines = "log_pipelines"
|
|
|
|
// NewRepo initiates a new ingestion repo
|
|
func NewRepo(sqlStore sqlstore.SQLStore) Repo {
|
|
return Repo{
|
|
sqlStore: sqlStore,
|
|
}
|
|
}
|
|
|
|
// insertPipeline stores a given postable pipeline to database
|
|
func (r *Repo) insertPipeline(
|
|
ctx context.Context, orgID string, postable *pipelinetypes.PostablePipeline,
|
|
) (*pipelinetypes.GettablePipeline, *model.ApiError) {
|
|
if err := postable.IsValid(); err != nil {
|
|
return nil, model.BadRequest(errors.Wrap(err,
|
|
"pipeline is not valid",
|
|
))
|
|
}
|
|
|
|
rawConfig, err := json.Marshal(postable.Config)
|
|
if err != nil {
|
|
return nil, model.BadRequest(errors.Wrap(err,
|
|
"failed to unmarshal postable pipeline config",
|
|
))
|
|
}
|
|
filter, err := json.Marshal(postable.Filter)
|
|
if err != nil {
|
|
return nil, model.BadRequest(errors.Wrap(err,
|
|
"failed to marshal postable pipeline filter",
|
|
))
|
|
}
|
|
|
|
claims, ok := authtypes.ClaimsFromContext(ctx)
|
|
if !ok {
|
|
return nil, model.UnauthorizedError(fmt.Errorf("failed to get email from context"))
|
|
}
|
|
|
|
insertRow := &pipelinetypes.GettablePipeline{
|
|
StoreablePipeline: pipelinetypes.StoreablePipeline{
|
|
OrgID: orgID,
|
|
Identifiable: types.Identifiable{
|
|
ID: valuer.GenerateUUID(),
|
|
},
|
|
OrderID: postable.OrderID,
|
|
Enabled: postable.Enabled,
|
|
Name: postable.Name,
|
|
Alias: postable.Alias,
|
|
Description: postable.Description,
|
|
FilterString: string(filter),
|
|
ConfigJSON: string(rawConfig),
|
|
TimeAuditable: types.TimeAuditable{
|
|
CreatedAt: time.Now(),
|
|
},
|
|
UserAuditable: types.UserAuditable{
|
|
CreatedBy: claims.Email,
|
|
},
|
|
},
|
|
Filter: postable.Filter,
|
|
Config: postable.Config,
|
|
}
|
|
|
|
_, err = r.sqlStore.BunDB().NewInsert().
|
|
Model(&insertRow.StoreablePipeline).
|
|
Exec(ctx)
|
|
|
|
if err != nil {
|
|
zap.L().Error("error in inserting pipeline data", zap.Error(err))
|
|
return nil, model.InternalError(errors.Wrap(err, "failed to insert pipeline"))
|
|
}
|
|
|
|
return insertRow, nil
|
|
}
|
|
|
|
// getPipelinesByVersion returns pipelines associated with a given version
|
|
func (r *Repo) getPipelinesByVersion(
|
|
ctx context.Context, orgID string, version int,
|
|
) ([]pipelinetypes.GettablePipeline, []error) {
|
|
var errors []error
|
|
storablePipelines := []pipelinetypes.StoreablePipeline{}
|
|
err := r.sqlStore.BunDB().NewSelect().
|
|
Model(&storablePipelines).
|
|
Join("JOIN agent_config_elements e ON p.id = e.element_id").
|
|
Join("JOIN agent_config_versions v ON v.id = e.version_id").
|
|
Where("e.element_type = ?", logPipelines). // TODO: nitya - add org_id to this as well
|
|
Where("v.version = ?", version). // TODO: nitya - add org_id to this as well
|
|
Where("p.org_id = ?", orgID).
|
|
Order("p.order_id ASC").
|
|
Scan(ctx)
|
|
if err != nil {
|
|
return nil, []error{fmt.Errorf("failed to get pipelines from db: %v", err)}
|
|
}
|
|
|
|
gettablePipelines := make([]pipelinetypes.GettablePipeline, len(storablePipelines))
|
|
if len(storablePipelines) == 0 {
|
|
return gettablePipelines, nil
|
|
}
|
|
|
|
for i := range storablePipelines {
|
|
gettablePipelines[i].StoreablePipeline = storablePipelines[i]
|
|
if err := gettablePipelines[i].ParseRawConfig(); err != nil {
|
|
errors = append(errors, err)
|
|
}
|
|
if err := gettablePipelines[i].ParseFilter(); err != nil {
|
|
errors = append(errors, err)
|
|
}
|
|
}
|
|
|
|
return gettablePipelines, errors
|
|
}
|
|
|
|
func (r *Repo) GetDefaultOrgID(ctx context.Context) (string, *model.ApiError) {
|
|
var orgs []types.Organization
|
|
err := r.sqlStore.BunDB().NewSelect().
|
|
Model(&orgs).
|
|
Scan(ctx)
|
|
if err != nil {
|
|
return "", model.InternalError(errors.Wrap(err, "failed to get default org ID"))
|
|
}
|
|
if len(orgs) == 0 {
|
|
return "", model.InternalError(errors.New("no orgs found"))
|
|
}
|
|
return orgs[0].ID.StringValue(), nil
|
|
}
|
|
|
|
// GetPipelines returns pipeline and errors (if any)
|
|
func (r *Repo) GetPipeline(
|
|
ctx context.Context, orgID string, id string,
|
|
) (*pipelinetypes.GettablePipeline, *model.ApiError) {
|
|
storablePipelines := []pipelinetypes.StoreablePipeline{}
|
|
|
|
err := r.sqlStore.BunDB().NewSelect().
|
|
Model(&storablePipelines).
|
|
Where("id = ?", id).
|
|
Where("org_id = ?", orgID).
|
|
Scan(ctx)
|
|
if err != nil {
|
|
zap.L().Error("failed to get ingestion pipeline from db", zap.Error(err))
|
|
return nil, model.InternalError(errors.Wrap(err, "failed to get ingestion pipeline from db"))
|
|
}
|
|
|
|
if len(storablePipelines) == 0 {
|
|
zap.L().Warn("No row found for ingestion pipeline id", zap.String("id", id))
|
|
return nil, model.NotFoundError(fmt.Errorf("no row found for ingestion pipeline id %v", id))
|
|
}
|
|
|
|
if len(storablePipelines) == 1 {
|
|
gettablePipeline := pipelinetypes.GettablePipeline{}
|
|
gettablePipeline.StoreablePipeline = storablePipelines[0]
|
|
if err := gettablePipeline.ParseRawConfig(); err != nil {
|
|
zap.L().Error("invalid pipeline config found", zap.String("id", id), zap.Error(err))
|
|
return nil, model.InternalError(
|
|
errors.Wrap(err, "found an invalid pipeline config"),
|
|
)
|
|
}
|
|
if err := gettablePipeline.ParseFilter(); err != nil {
|
|
zap.L().Error("invalid pipeline filter found", zap.String("id", id), zap.Error(err))
|
|
return nil, model.InternalError(
|
|
errors.Wrap(err, "found an invalid pipeline filter"),
|
|
)
|
|
}
|
|
return &gettablePipeline, nil
|
|
}
|
|
|
|
return nil, model.InternalError(fmt.Errorf("multiple pipelines with same id"))
|
|
}
|
|
|
|
func (r *Repo) DeletePipeline(ctx context.Context, orgID string, id string) error {
|
|
_, err := r.sqlStore.BunDB().NewDelete().
|
|
Model(&pipelinetypes.StoreablePipeline{}).
|
|
Where("id = ?", id).
|
|
Where("org_id = ?", orgID).
|
|
Exec(ctx)
|
|
if err != nil {
|
|
return model.BadRequest(err)
|
|
}
|
|
return nil
|
|
}
|