Nityananda Gohain 1a3e46cecd
feat: integrate pipelines API (#2457)
* chore: integrate pipelines API

* fix: limit support integrated in pipelines

* fix: interface to string

* fix: json parser and allow deleting all pipelines

* fix: output modified if operators are disabled

* fix: validation updated for operators

* fix: expression check added

* fix: regex expression check added

* fix: remove operator validation updated

* fix: tests updated for pipeline builder

* fix: fix error messages in http handler

* fix: dont return payload if there is an error

* fix: extracting userId from context moved to auth package

* fix: api errors moved to http handler

* fix: get version logic updated

* fix: deployment result message updated

* fix: pipeline builder edgecase fixed and tests updated

* fix: get failing postablePipeline tests to pass

---------

Co-authored-by: Vishal Sharma <makeavish786@gmail.com>
Co-authored-by: Raj <rkssisodiya@gmail.com>
2023-07-31 21:34:42 +05:30

199 lines
4.6 KiB
Go

package logparsingpipeline
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline/sqlite"
"go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/model"
"go.uber.org/zap"
)
// Repo handles DDL and DML ops on ingestion pipeline
type Repo struct {
db *sqlx.DB
}
const logPipelines = "log_pipelines"
// NewRepo initiates a new ingestion repo
func NewRepo(db *sqlx.DB) Repo {
return Repo{
db: db,
}
}
func (r *Repo) InitDB(engine string) error {
switch engine {
case "sqlite3", "sqlite":
return sqlite.InitDB(r.db)
default:
return fmt.Errorf("unsupported db")
}
}
// insertPipeline stores a given postable pipeline to database
func (r *Repo) insertPipeline(ctx context.Context, postable *PostablePipeline) (*model.Pipeline, error) {
if err := postable.IsValid(); err != nil {
return nil, errors.Wrap(err, "failed to validate postable pipeline")
}
rawConfig, err := json.Marshal(postable.Config)
if err != nil {
return nil, errors.Wrap(err, "failed to unmarshal postable pipeline config")
}
jwt, err := auth.ExtractJwtFromContext(ctx)
if err != nil {
return nil, err
}
claims, err := auth.ParseJWT(jwt)
if err != nil {
return nil, err
}
insertRow := &model.Pipeline{
Id: uuid.New().String(),
OrderId: postable.OrderId,
Enabled: postable.Enabled,
Name: postable.Name,
Alias: postable.Alias,
Description: &postable.Description,
Filter: postable.Filter,
Config: postable.Config,
RawConfig: string(rawConfig),
Creator: model.Creator{
CreatedBy: claims["email"].(string),
CreatedAt: time.Now(),
},
}
insertQuery := `INSERT INTO pipelines
(id, order_id, enabled, created_by, created_at, name, alias, description, filter, config_json)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`
_, err = r.db.ExecContext(ctx,
insertQuery,
insertRow.Id,
insertRow.OrderId,
insertRow.Enabled,
insertRow.Creator.CreatedBy,
insertRow.Creator.CreatedAt,
insertRow.Name,
insertRow.Alias,
insertRow.Description,
insertRow.Filter,
insertRow.RawConfig)
if err != nil {
zap.S().Errorf("error in inserting pipeline data: ", zap.Error(err))
return insertRow, errors.Wrap(err, "failed to insert pipeline")
}
return insertRow, nil
}
// getPipelinesByVersion returns pipelines associated with a given version
func (r *Repo) getPipelinesByVersion(ctx context.Context, version int) ([]model.Pipeline, []error) {
var errors []error
pipelines := []model.Pipeline{}
versionQuery := `SELECT r.id,
r.name,
r.config_json,
r.alias,
r.description,
r.filter,
r.order_id,
r.created_by,
r.created_at,
r.enabled
FROM pipelines r,
agent_config_elements e,
agent_config_versions v
WHERE r.id = e.element_id
AND v.id = e.version_id
AND e.element_type = $1
AND v.version = $2
ORDER BY order_id asc`
err := r.db.SelectContext(ctx, &pipelines, versionQuery, logPipelines, version)
if err != nil {
return nil, []error{fmt.Errorf("failed to get drop pipelines from db: %v", err)}
}
if len(pipelines) == 0 {
return pipelines, nil
}
for i := range pipelines {
if err := pipelines[i].ParseRawConfig(); err != nil {
errors = append(errors, err)
}
}
return pipelines, errors
}
// GetPipelines returns pipeline and errors (if any)
func (r *Repo) GetPipeline(ctx context.Context, id string) (*model.Pipeline, error) {
pipelines := []model.Pipeline{}
pipelineQuery := `SELECT id,
name,
config_json,
alias,
description,
filter,
order_id,
created_by,
created_at,
enabled
FROM pipelines
WHERE id = $1`
err := r.db.SelectContext(ctx, &pipelines, pipelineQuery, id)
if err != nil {
zap.S().Errorf("failed to get ingestion pipeline from db", err)
return nil, model.BadRequestStr("failed to get ingestion pipeline from db")
}
if len(pipelines) == 0 {
zap.S().Warnf("No row found for ingestion pipeline id", id)
return nil, nil
}
if len(pipelines) == 1 {
err := pipelines[0].ParseRawConfig()
if err != nil {
zap.S().Errorf("invalid pipeline config found", id, err)
return &pipelines[0], model.InternalError(fmt.Errorf("found an invalid pipeline config "))
}
return &pipelines[0], nil
}
return nil, model.InternalError(fmt.Errorf("multiple pipelines with same id"))
}
func (r *Repo) DeletePipeline(ctx context.Context, id string) error {
deleteQuery := `DELETE
FROM pipelines
WHERE id = $1`
_, err := r.db.ExecContext(ctx, deleteQuery, id)
if err != nil {
return model.BadRequest(err)
}
return nil
}