chore: return 400 http status on pipeline validation error (#3472)

* chore: add integration test for log parsing pipelines validation

* chore: add helpers for creating unauthorized, unavailable & not found api errors

* chore: return *model.APIError from logpipeline and agentConf functions

* chore: some cleanup

* chore: some more cleanup

* chore: one more round of cleanups
This commit is contained in:
Raj Kamal Singh 2023-09-10 16:48:29 +05:30 committed by GitHub
parent 052c32ce78
commit dfd94f67bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 335 additions and 107 deletions

View File

@ -8,6 +8,7 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"go.signoz.io/signoz/pkg/query-service/agentConf/sqlite" "go.signoz.io/signoz/pkg/query-service/agentConf/sqlite"
"go.signoz.io/signoz/pkg/query-service/model" "go.signoz.io/signoz/pkg/query-service/model"
"go.uber.org/zap" "go.uber.org/zap"
@ -31,7 +32,9 @@ func (r *Repo) initDB(engine string) error {
} }
} }
func (r *Repo) GetConfigHistory(ctx context.Context, typ ElementTypeDef, limit int) ([]ConfigVersion, error) { func (r *Repo) GetConfigHistory(
ctx context.Context, typ ElementTypeDef, limit int,
) ([]ConfigVersion, *model.ApiError) {
var c []ConfigVersion var c []ConfigVersion
err := r.db.SelectContext(ctx, &c, fmt.Sprintf(`SELECT err := r.db.SelectContext(ctx, &c, fmt.Sprintf(`SELECT
version, version,
@ -54,10 +57,16 @@ func (r *Repo) GetConfigHistory(ctx context.Context, typ ElementTypeDef, limit i
limit %v`, limit), limit %v`, limit),
typ) typ)
return c, err if err != nil {
return nil, model.InternalError(err)
}
return c, nil
} }
func (r *Repo) GetConfigVersion(ctx context.Context, typ ElementTypeDef, v int) (*ConfigVersion, error) { func (r *Repo) GetConfigVersion(
ctx context.Context, typ ElementTypeDef, v int,
) (*ConfigVersion, *model.ApiError) {
var c ConfigVersion var c ConfigVersion
err := r.db.GetContext(ctx, &c, `SELECT err := r.db.GetContext(ctx, &c, `SELECT
id, id,
@ -78,11 +87,19 @@ func (r *Repo) GetConfigVersion(ctx context.Context, typ ElementTypeDef, v int)
WHERE element_type = $1 WHERE element_type = $1
AND version = $2`, typ, v) AND version = $2`, typ, v)
return &c, err if err == sql.ErrNoRows {
return nil, model.NotFoundError(err)
}
if err != nil {
return nil, model.InternalError(err)
}
return &c, nil
} }
func (r *Repo) GetLatestVersion(ctx context.Context, typ ElementTypeDef) (*ConfigVersion, error) { func (r *Repo) GetLatestVersion(
ctx context.Context, typ ElementTypeDef,
) (*ConfigVersion, *model.ApiError) {
var c ConfigVersion var c ConfigVersion
err := r.db.GetContext(ctx, &c, `SELECT err := r.db.GetContext(ctx, &c, `SELECT
id, id,
@ -103,23 +120,31 @@ func (r *Repo) GetLatestVersion(ctx context.Context, typ ElementTypeDef) (*Confi
SELECT MAX(version) SELECT MAX(version)
FROM agent_config_versions FROM agent_config_versions
WHERE element_type=$2)`, typ, typ) WHERE element_type=$2)`, typ, typ)
if err != nil {
// intially the table will be empty if err == sql.ErrNoRows {
return nil, err return nil, model.NotFoundError(err)
} }
return &c, err if err != nil {
return nil, model.InternalError(err)
}
return &c, nil
} }
func (r *Repo) insertConfig(ctx context.Context, userId string, c *ConfigVersion, elements []string) (fnerr error) { func (r *Repo) insertConfig(
ctx context.Context, userId string, c *ConfigVersion, elements []string,
) (fnerr *model.ApiError) {
if string(c.ElementType) == "" { if string(c.ElementType) == "" {
return fmt.Errorf("element type is required for creating agent config version") return model.BadRequest(fmt.Errorf(
"element type is required for creating agent config version",
))
} }
// allowing empty elements for logs - use case is deleting all pipelines // allowing empty elements for logs - use case is deleting all pipelines
if len(elements) == 0 && c.ElementType != ElementTypeLogPipelines { if len(elements) == 0 && c.ElementType != ElementTypeLogPipelines {
zap.S().Error("insert config called with no elements ", c.ElementType) zap.S().Error("insert config called with no elements ", c.ElementType)
return fmt.Errorf("config must have atleast one element") return model.BadRequest(fmt.Errorf("config must have atleast one element"))
} }
if c.Version != 0 { if c.Version != 0 {
@ -127,15 +152,15 @@ func (r *Repo) insertConfig(ctx context.Context, userId string, c *ConfigVersion
// in a monotonically increasing order starting with 1. hence, we reject insert // in a monotonically increasing order starting with 1. hence, we reject insert
// requests with version anything other than 0. here, 0 indicates un-assigned // requests with version anything other than 0. here, 0 indicates un-assigned
zap.S().Error("invalid version assignment while inserting agent config", c.Version, c.ElementType) zap.S().Error("invalid version assignment while inserting agent config", c.Version, c.ElementType)
return fmt.Errorf("user defined versions are not supported in the agent config") return model.BadRequest(fmt.Errorf(
"user defined versions are not supported in the agent config",
))
} }
configVersion, err := r.GetLatestVersion(ctx, c.ElementType) configVersion, err := r.GetLatestVersion(ctx, c.ElementType)
if err != nil { if err != nil && err.Type() != model.ErrorNotFound {
if err != sql.ErrNoRows { zap.S().Error("failed to fetch latest config version", err)
zap.S().Error("failed to fetch latest config version", err) return model.InternalError(fmt.Errorf("failed to fetch latest config version"))
return fmt.Errorf("failed to fetch latest config version")
}
} }
if configVersion != nil { if configVersion != nil {
@ -166,7 +191,7 @@ func (r *Repo) insertConfig(ctx context.Context, userId string, c *ConfigVersion
deploy_result) deploy_result)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)` VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`
_, err = r.db.ExecContext(ctx, _, dbErr := r.db.ExecContext(ctx,
configQuery, configQuery,
c.ID, c.ID,
c.Version, c.Version,
@ -178,9 +203,9 @@ func (r *Repo) insertConfig(ctx context.Context, userId string, c *ConfigVersion
c.DeployStatus, c.DeployStatus,
c.DeployResult) c.DeployResult)
if err != nil { if dbErr != nil {
zap.S().Error("error in inserting config version: ", zap.Error(err)) zap.S().Error("error in inserting config version: ", zap.Error(dbErr))
return fmt.Errorf("failed to insert ingestion rule") return model.InternalError(errors.Wrap(dbErr, "failed to insert ingestion rule"))
} }
elementsQuery := `INSERT INTO agent_config_elements( elementsQuery := `INSERT INTO agent_config_elements(
@ -191,15 +216,16 @@ func (r *Repo) insertConfig(ctx context.Context, userId string, c *ConfigVersion
VALUES ($1, $2, $3, $4)` VALUES ($1, $2, $3, $4)`
for _, e := range elements { for _, e := range elements {
_, dbErr = r.db.ExecContext(
_, err = r.db.ExecContext(ctx, ctx,
elementsQuery, elementsQuery,
uuid.NewString(), uuid.NewString(),
c.ID, c.ID,
c.ElementType, c.ElementType,
e) e,
if err != nil { )
return err if dbErr != nil {
return model.InternalError(dbErr)
} }
} }
@ -212,7 +238,7 @@ func (r *Repo) updateDeployStatus(ctx context.Context,
status string, status string,
result string, result string,
lastHash string, lastHash string,
lastconf string) error { lastconf string) *model.ApiError {
updateQuery := `UPDATE agent_config_versions updateQuery := `UPDATE agent_config_versions
set deploy_status = $1, set deploy_status = $1,
@ -225,13 +251,15 @@ func (r *Repo) updateDeployStatus(ctx context.Context,
_, err := r.db.ExecContext(ctx, updateQuery, status, result, lastHash, lastconf, version, string(elementType)) _, err := r.db.ExecContext(ctx, updateQuery, status, result, lastHash, lastconf, version, string(elementType))
if err != nil { if err != nil {
zap.S().Error("failed to update deploy status", err) zap.S().Error("failed to update deploy status", err)
return model.BadRequestStr("failed to update deploy status") return model.BadRequest(fmt.Errorf("failed to update deploy status"))
} }
return nil return nil
} }
func (r *Repo) updateDeployStatusByHash(ctx context.Context, confighash string, status string, result string) error { func (r *Repo) updateDeployStatusByHash(
ctx context.Context, confighash string, status string, result string,
) *model.ApiError {
updateQuery := `UPDATE agent_config_versions updateQuery := `UPDATE agent_config_versions
set deploy_status = $1, set deploy_status = $1,
@ -241,7 +269,7 @@ func (r *Repo) updateDeployStatusByHash(ctx context.Context, confighash string,
_, err := r.db.ExecContext(ctx, updateQuery, status, result, confighash) _, err := r.db.ExecContext(ctx, updateQuery, status, result, confighash)
if err != nil { if err != nil {
zap.S().Error("failed to update deploy status", err) zap.S().Error("failed to update deploy status", err)
return model.BadRequestStr("failed to update deploy status") return model.InternalError(errors.Wrap(err, "failed to update deploy status"))
} }
return nil return nil

View File

@ -9,6 +9,7 @@ import (
"go.signoz.io/signoz/pkg/query-service/app/opamp" "go.signoz.io/signoz/pkg/query-service/app/opamp"
filterprocessor "go.signoz.io/signoz/pkg/query-service/app/opamp/otelconfig/filterprocessor" filterprocessor "go.signoz.io/signoz/pkg/query-service/app/opamp/otelconfig/filterprocessor"
tsp "go.signoz.io/signoz/pkg/query-service/app/opamp/otelconfig/tailsampler" tsp "go.signoz.io/signoz/pkg/query-service/app/opamp/otelconfig/tailsampler"
"go.signoz.io/signoz/pkg/query-service/model"
"go.uber.org/zap" "go.uber.org/zap"
yaml "gopkg.in/yaml.v3" yaml "gopkg.in/yaml.v3"
) )
@ -43,24 +44,32 @@ func Ready() bool {
return m.Ready() return m.Ready()
} }
func GetLatestVersion(ctx context.Context, elementType ElementTypeDef) (*ConfigVersion, error) { func GetLatestVersion(
ctx context.Context, elementType ElementTypeDef,
) (*ConfigVersion, *model.ApiError) {
return m.GetLatestVersion(ctx, elementType) return m.GetLatestVersion(ctx, elementType)
} }
func GetConfigVersion(ctx context.Context, elementType ElementTypeDef, version int) (*ConfigVersion, error) { func GetConfigVersion(
ctx context.Context, elementType ElementTypeDef, version int,
) (*ConfigVersion, *model.ApiError) {
return m.GetConfigVersion(ctx, elementType, version) return m.GetConfigVersion(ctx, elementType, version)
} }
func GetConfigHistory(ctx context.Context, typ ElementTypeDef, limit int) ([]ConfigVersion, error) { func GetConfigHistory(
ctx context.Context, typ ElementTypeDef, limit int,
) ([]ConfigVersion, *model.ApiError) {
return m.GetConfigHistory(ctx, typ, limit) return m.GetConfigHistory(ctx, typ, limit)
} }
// StartNewVersion launches a new config version for given set of elements // StartNewVersion launches a new config version for given set of elements
func StartNewVersion(ctx context.Context, userId string, eleType ElementTypeDef, elementIds []string) (*ConfigVersion, error) { func StartNewVersion(
ctx context.Context, userId string, eleType ElementTypeDef, elementIds []string,
) (*ConfigVersion, *model.ApiError) {
if !m.Ready() { if !m.Ready() {
// agent is already being updated, ask caller to wait and re-try after sometime // agent is already being updated, ask caller to wait and re-try after sometime
return nil, fmt.Errorf("agent updater is busy") return nil, model.UnavailableError(fmt.Errorf("agent updater is busy"))
} }
// create a new version // create a new version
@ -75,24 +84,24 @@ func StartNewVersion(ctx context.Context, userId string, eleType ElementTypeDef,
return cfg, nil return cfg, nil
} }
func Redeploy(ctx context.Context, typ ElementTypeDef, version int) error { func Redeploy(ctx context.Context, typ ElementTypeDef, version int) *model.ApiError {
configVersion, err := GetConfigVersion(ctx, typ, version) configVersion, err := GetConfigVersion(ctx, typ, version)
if err != nil { if err != nil {
zap.S().Debug("failed to fetch config version during redeploy", err) zap.S().Debug("failed to fetch config version during redeploy", err)
return fmt.Errorf("failed to fetch details of the config version") return model.WrapApiError(err, "failed to fetch details of the config version")
} }
if configVersion == nil || (configVersion != nil && configVersion.LastConf == "") { if configVersion == nil || (configVersion != nil && configVersion.LastConf == "") {
zap.S().Debug("config version has no conf yaml", configVersion) zap.S().Debug("config version has no conf yaml", configVersion)
return fmt.Errorf("the config version can not be redeployed") return model.BadRequest(fmt.Errorf("the config version can not be redeployed"))
} }
switch typ { switch typ {
case ElementTypeSamplingRules: case ElementTypeSamplingRules:
var config *tsp.Config var config *tsp.Config
if err := yaml.Unmarshal([]byte(configVersion.LastConf), &config); err != nil { if err := yaml.Unmarshal([]byte(configVersion.LastConf), &config); err != nil {
zap.S().Error("failed to read last conf correctly", err) zap.S().Error("failed to read last conf correctly", err)
return fmt.Errorf("failed to read the stored config correctly") return model.BadRequest(fmt.Errorf("failed to read the stored config correctly"))
} }
// merge current config with new filter params // merge current config with new filter params
@ -104,7 +113,7 @@ func Redeploy(ctx context.Context, typ ElementTypeDef, version int) error {
configHash, err := opamp.UpsertControlProcessors(ctx, "traces", processorConf, m.OnConfigUpdate) configHash, err := opamp.UpsertControlProcessors(ctx, "traces", processorConf, m.OnConfigUpdate)
if err != nil { if err != nil {
zap.S().Error("failed to call agent config update for trace processor:", err) zap.S().Error("failed to call agent config update for trace processor:", err)
return fmt.Errorf("failed to deploy the config") return model.InternalError(fmt.Errorf("failed to deploy the config"))
} }
m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, configVersion.LastConf) m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, configVersion.LastConf)
@ -112,7 +121,7 @@ func Redeploy(ctx context.Context, typ ElementTypeDef, version int) error {
var filterConfig *filterprocessor.Config var filterConfig *filterprocessor.Config
if err := yaml.Unmarshal([]byte(configVersion.LastConf), &filterConfig); err != nil { if err := yaml.Unmarshal([]byte(configVersion.LastConf), &filterConfig); err != nil {
zap.S().Error("failed to read last conf correctly", err) zap.S().Error("failed to read last conf correctly", err)
return fmt.Errorf("failed to read the stored config correctly") return model.InternalError(fmt.Errorf("failed to read the stored config correctly"))
} }
processorConf := map[string]interface{}{ processorConf := map[string]interface{}{
"filter": filterConfig, "filter": filterConfig,
@ -151,9 +160,9 @@ func UpsertFilterProcessor(ctx context.Context, version int, config *filterproce
return err return err
} }
processorConfYaml, err := yaml.Marshal(config) processorConfYaml, yamlErr := yaml.Marshal(config)
if err != nil { if yamlErr != nil {
zap.S().Warnf("unexpected error while transforming processor config to yaml", err) zap.S().Warnf("unexpected error while transforming processor config to yaml", yamlErr)
} }
m.updateDeployStatus(ctx, ElementTypeDropRules, version, string(DeployInitiated), "Deployment started", configHash, string(processorConfYaml)) m.updateDeployStatus(ctx, ElementTypeDropRules, version, string(DeployInitiated), "Deployment started", configHash, string(processorConfYaml))
@ -202,9 +211,9 @@ func UpsertSamplingProcessor(ctx context.Context, version int, config *tsp.Confi
return err return err
} }
processorConfYaml, err := yaml.Marshal(config) processorConfYaml, yamlErr := yaml.Marshal(config)
if err != nil { if yamlErr != nil {
zap.S().Warnf("unexpected error while transforming processor config to yaml", err) zap.S().Warnf("unexpected error while transforming processor config to yaml", yamlErr)
} }
m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, string(processorConfYaml)) m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, string(processorConfYaml))
@ -212,9 +221,15 @@ func UpsertSamplingProcessor(ctx context.Context, version int, config *tsp.Confi
} }
// UpsertLogParsingProcessors updates the agent with log parsing processors // UpsertLogParsingProcessors updates the agent with log parsing processors
func UpsertLogParsingProcessor(ctx context.Context, version int, rawPipelineData []byte, config map[string]interface{}, names []string) error { func UpsertLogParsingProcessor(
ctx context.Context,
version int,
rawPipelineData []byte,
config map[string]interface{},
names []string,
) *model.ApiError {
if !atomic.CompareAndSwapUint32(&m.lock, 0, 1) { if !atomic.CompareAndSwapUint32(&m.lock, 0, 1) {
return fmt.Errorf("agent updater is busy") return model.UnavailableError(fmt.Errorf("agent updater is busy"))
} }
defer atomic.StoreUint32(&m.lock, 0) defer atomic.StoreUint32(&m.lock, 0)

View File

@ -3,7 +3,6 @@ package app
import ( import (
"bytes" "bytes"
"context" "context"
"database/sql"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -2423,7 +2422,7 @@ func (ah *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Re
version, err := parseAgentConfigVersion(r) version, err := parseAgentConfigVersion(r)
if err != nil { if err != nil {
RespondError(w, err, nil) RespondError(w, model.WrapApiError(err, "Failed to parse agent config version"), nil)
return return
} }
@ -2444,12 +2443,14 @@ func (ah *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Re
} }
// listLogsPipelines lists logs piplines for latest version // listLogsPipelines lists logs piplines for latest version
func (ah *APIHandler) listLogsPipelines(ctx context.Context) (*logparsingpipeline.PipelinesResponse, *model.ApiError) { func (ah *APIHandler) listLogsPipelines(ctx context.Context) (
*logparsingpipeline.PipelinesResponse, *model.ApiError,
) {
// get lateset agent config // get lateset agent config
lastestConfig, err := agentConf.GetLatestVersion(ctx, logPipelines) lastestConfig, err := agentConf.GetLatestVersion(ctx, logPipelines)
if err != nil { if err != nil {
if err != sql.ErrNoRows { if err.Type() != model.ErrorNotFound {
return nil, model.InternalError(fmt.Errorf("failed to get latest agent config version with error %w", err)) return nil, model.WrapApiError(err, "failed to get latest agent config version")
} else { } else {
return nil, nil return nil, nil
} }
@ -2457,31 +2458,33 @@ func (ah *APIHandler) listLogsPipelines(ctx context.Context) (*logparsingpipelin
payload, err := ah.LogsParsingPipelineController.GetPipelinesByVersion(ctx, lastestConfig.Version) payload, err := ah.LogsParsingPipelineController.GetPipelinesByVersion(ctx, lastestConfig.Version)
if err != nil { if err != nil {
return nil, model.InternalError(fmt.Errorf("failed to get pipelines with error %w", err)) return nil, model.WrapApiError(err, "failed to get pipelines")
} }
// todo(Nitya): make a new API for history pagination // todo(Nitya): make a new API for history pagination
limit := 10 limit := 10
history, err := agentConf.GetConfigHistory(ctx, logPipelines, limit) history, err := agentConf.GetConfigHistory(ctx, logPipelines, limit)
if err != nil { if err != nil {
return nil, model.InternalError(fmt.Errorf("failed to get config history with error %w", err)) return nil, model.WrapApiError(err, "failed to get config history")
} }
payload.History = history payload.History = history
return payload, nil return payload, nil
} }
// listLogsPipelinesByVersion lists pipelines along with config version history // listLogsPipelinesByVersion lists pipelines along with config version history
func (ah *APIHandler) listLogsPipelinesByVersion(ctx context.Context, version int) (*logparsingpipeline.PipelinesResponse, *model.ApiError) { func (ah *APIHandler) listLogsPipelinesByVersion(ctx context.Context, version int) (
*logparsingpipeline.PipelinesResponse, *model.ApiError,
) {
payload, err := ah.LogsParsingPipelineController.GetPipelinesByVersion(ctx, version) payload, err := ah.LogsParsingPipelineController.GetPipelinesByVersion(ctx, version)
if err != nil { if err != nil {
return nil, model.InternalError(err) return nil, model.WrapApiError(err, "failed to get pipelines by version")
} }
// todo(Nitya): make a new API for history pagination // todo(Nitya): make a new API for history pagination
limit := 10 limit := 10
history, err := agentConf.GetConfigHistory(ctx, logPipelines, limit) history, err := agentConf.GetConfigHistory(ctx, logPipelines, limit)
if err != nil { if err != nil {
return nil, model.InternalError(fmt.Errorf("failed to retrieve agent config history with error %w", err)) return nil, model.WrapApiError(err, "failed to retrieve agent config history")
} }
payload.History = history payload.History = history
@ -2499,7 +2502,10 @@ func (ah *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request)
ctx := auth.AttachJwtToContext(context.Background(), r) ctx := auth.AttachJwtToContext(context.Background(), r)
createPipeline := func(ctx context.Context, postable []logparsingpipeline.PostablePipeline) (*logparsingpipeline.PipelinesResponse, error) { createPipeline := func(
ctx context.Context,
postable []logparsingpipeline.PostablePipeline,
) (*logparsingpipeline.PipelinesResponse, *model.ApiError) {
if len(postable) == 0 { if len(postable) == 0 {
zap.S().Warnf("found no pipelines in the http request, this will delete all the pipelines") zap.S().Warnf("found no pipelines in the http request, this will delete all the pipelines")
} }
@ -2515,7 +2521,7 @@ func (ah *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request)
res, err := createPipeline(ctx, req.Pipelines) res, err := createPipeline(ctx, req.Pipelines)
if err != nil { if err != nil {
RespondError(w, model.InternalError(err), nil) RespondError(w, err, nil)
return return
} }

View File

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"go.signoz.io/signoz/pkg/query-service/agentConf" "go.signoz.io/signoz/pkg/query-service/agentConf"
"go.signoz.io/signoz/pkg/query-service/auth" "go.signoz.io/signoz/pkg/query-service/auth"
"go.signoz.io/signoz/pkg/query-service/model" "go.signoz.io/signoz/pkg/query-service/model"
@ -32,11 +33,14 @@ type PipelinesResponse struct {
} }
// ApplyPipelines stores new or changed pipelines and initiates a new config update // ApplyPipelines stores new or changed pipelines and initiates a new config update
func (ic *LogParsingPipelineController) ApplyPipelines(ctx context.Context, postable []PostablePipeline) (*PipelinesResponse, error) { func (ic *LogParsingPipelineController) ApplyPipelines(
ctx context.Context,
postable []PostablePipeline,
) (*PipelinesResponse, *model.ApiError) {
// get user id from context // get user id from context
userId, err := auth.ExtractUserIdFromContext(ctx) userId, authErr := auth.ExtractUserIdFromContext(ctx)
if err != nil { if authErr != nil {
return nil, model.InternalError(fmt.Errorf("failed to get userId from context %v", err)) return nil, model.UnauthorizedError(errors.Wrap(authErr, "failed to get userId from context"))
} }
var pipelines []model.Pipeline var pipelines []model.Pipeline
@ -51,17 +55,17 @@ func (ic *LogParsingPipelineController) ApplyPipelines(ctx context.Context, post
if r.Id == "" { if r.Id == "" {
// looks like a new or changed pipeline, store it first // looks like a new or changed pipeline, store it first
inserted, err := ic.insertPipeline(ctx, &r) inserted, err := ic.insertPipeline(ctx, &r)
if err != nil || inserted == nil { if err != nil {
zap.S().Errorf("failed to insert edited pipeline %s", err.Error()) zap.S().Errorf("failed to insert edited pipeline %s", err.Error())
return nil, fmt.Errorf("failed to insert edited pipeline") return nil, model.WrapApiError(err, "failed to insert edited pipeline")
} else { } else {
pipelines = append(pipelines, *inserted) pipelines = append(pipelines, *inserted)
} }
} else { } else {
selected, err := ic.GetPipeline(ctx, r.Id) selected, err := ic.GetPipeline(ctx, r.Id)
if err != nil || selected == nil { if err != nil {
zap.S().Errorf("failed to find edited pipeline %s", err.Error()) zap.S().Errorf("failed to find edited pipeline %s", err.Error())
return nil, fmt.Errorf("failed to find pipeline, invalid request") return nil, model.WrapApiError(err, "failed to find edited pipeline")
} }
pipelines = append(pipelines, *selected) pipelines = append(pipelines, *selected)
} }
@ -69,14 +73,18 @@ func (ic *LogParsingPipelineController) ApplyPipelines(ctx context.Context, post
} }
// prepare filter config (processor) from the pipelines // prepare filter config (processor) from the pipelines
filterConfig, names, err := PreparePipelineProcessor(pipelines) filterConfig, names, translationErr := PreparePipelineProcessor(pipelines)
if err != nil { if translationErr != nil {
zap.S().Errorf("failed to generate processor config from pipelines for deployment %s", err.Error()) zap.S().Errorf("failed to generate processor config from pipelines for deployment %w", translationErr)
return nil, err return nil, model.BadRequest(errors.Wrap(
translationErr, "failed to generate processor config from pipelines for deployment",
))
} }
if !agentConf.Ready() { if !agentConf.Ready() {
return nil, fmt.Errorf("agent updater unavailable at the moment. Please try in sometime") return nil, model.UnavailableError(fmt.Errorf(
"agent updater unavailable at the moment. Please try in sometime",
))
} }
// prepare config elements // prepare config elements
@ -107,22 +115,24 @@ func (ic *LogParsingPipelineController) ApplyPipelines(ctx context.Context, post
} }
if err != nil { if err != nil {
return response, fmt.Errorf("failed to apply pipelines") return response, model.WrapApiError(err, "failed to apply pipelines")
} }
return response, nil return response, nil
} }
// GetPipelinesByVersion responds with version info and associated pipelines // GetPipelinesByVersion responds with version info and associated pipelines
func (ic *LogParsingPipelineController) GetPipelinesByVersion(ctx context.Context, version int) (*PipelinesResponse, error) { func (ic *LogParsingPipelineController) GetPipelinesByVersion(
ctx context.Context, version int,
) (*PipelinesResponse, *model.ApiError) {
pipelines, errors := ic.getPipelinesByVersion(ctx, version) pipelines, errors := ic.getPipelinesByVersion(ctx, version)
if errors != nil { if errors != nil {
zap.S().Errorf("failed to get pipelines for version %d, %w", version, errors) zap.S().Errorf("failed to get pipelines for version %d, %w", version, errors)
return nil, fmt.Errorf("failed to get pipelines for given version") return nil, model.InternalError(fmt.Errorf("failed to get pipelines for given version"))
} }
configVersion, err := agentConf.GetConfigVersion(ctx, agentConf.ElementTypeLogPipelines, version) configVersion, err := agentConf.GetConfigVersion(ctx, agentConf.ElementTypeLogPipelines, version)
if err != nil || configVersion == nil { if err != nil {
zap.S().Errorf("failed to get config for version %d, %s", version, err.Error()) zap.S().Errorf("failed to get config for version %d, %s", version, err.Error())
return nil, fmt.Errorf("failed to get config for given version") return nil, model.WrapApiError(err, "failed to get config for given version")
} }
return &PipelinesResponse{ return &PipelinesResponse{

View File

@ -39,24 +39,30 @@ func (r *Repo) InitDB(engine string) error {
} }
// insertPipeline stores a given postable pipeline to database // insertPipeline stores a given postable pipeline to database
func (r *Repo) insertPipeline(ctx context.Context, postable *PostablePipeline) (*model.Pipeline, error) { func (r *Repo) insertPipeline(
ctx context.Context, postable *PostablePipeline,
) (*model.Pipeline, *model.ApiError) {
if err := postable.IsValid(); err != nil { if err := postable.IsValid(); err != nil {
return nil, errors.Wrap(err, "failed to validate postable pipeline") return nil, model.BadRequest(errors.Wrap(err,
"pipeline is not valid",
))
} }
rawConfig, err := json.Marshal(postable.Config) rawConfig, err := json.Marshal(postable.Config)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to unmarshal postable pipeline config") return nil, model.BadRequest(errors.Wrap(err,
"failed to unmarshal postable pipeline config",
))
} }
jwt, err := auth.ExtractJwtFromContext(ctx) jwt, err := auth.ExtractJwtFromContext(ctx)
if err != nil { if err != nil {
return nil, err return nil, model.UnauthorizedError(err)
} }
claims, err := auth.ParseJWT(jwt) claims, err := auth.ParseJWT(jwt)
if err != nil { if err != nil {
return nil, err return nil, model.UnauthorizedError(err)
} }
insertRow := &model.Pipeline{ insertRow := &model.Pipeline{
@ -94,7 +100,7 @@ func (r *Repo) insertPipeline(ctx context.Context, postable *PostablePipeline) (
if err != nil { if err != nil {
zap.S().Errorf("error in inserting pipeline data: ", zap.Error(err)) zap.S().Errorf("error in inserting pipeline data: ", zap.Error(err))
return insertRow, errors.Wrap(err, "failed to insert pipeline") return nil, model.InternalError(errors.Wrap(err, "failed to insert pipeline"))
} }
return insertRow, nil return insertRow, nil
@ -143,7 +149,9 @@ func (r *Repo) getPipelinesByVersion(ctx context.Context, version int) ([]model.
} }
// GetPipelines returns pipeline and errors (if any) // GetPipelines returns pipeline and errors (if any)
func (r *Repo) GetPipeline(ctx context.Context, id string) (*model.Pipeline, error) { func (r *Repo) GetPipeline(
ctx context.Context, id string,
) (*model.Pipeline, *model.ApiError) {
pipelines := []model.Pipeline{} pipelines := []model.Pipeline{}
pipelineQuery := `SELECT id, pipelineQuery := `SELECT id,
@ -162,25 +170,26 @@ func (r *Repo) GetPipeline(ctx context.Context, id string) (*model.Pipeline, err
err := r.db.SelectContext(ctx, &pipelines, pipelineQuery, id) err := r.db.SelectContext(ctx, &pipelines, pipelineQuery, id)
if err != nil { if err != nil {
zap.S().Errorf("failed to get ingestion pipeline from db", err) zap.S().Errorf("failed to get ingestion pipeline from db", err)
return nil, model.BadRequestStr("failed to get ingestion pipeline from db") return nil, model.InternalError(errors.Wrap(err, "failed to get ingestion pipeline from db"))
} }
if len(pipelines) == 0 { if len(pipelines) == 0 {
zap.S().Warnf("No row found for ingestion pipeline id", id) zap.S().Warnf("No row found for ingestion pipeline id", id)
return nil, nil return nil, model.NotFoundError(fmt.Errorf("No row found for ingestion pipeline id %v", id))
} }
if len(pipelines) == 1 { if len(pipelines) == 1 {
err := pipelines[0].ParseRawConfig() err := pipelines[0].ParseRawConfig()
if err != nil { if err != nil {
zap.S().Errorf("invalid pipeline config found", id, err) zap.S().Errorf("invalid pipeline config found", id, err)
return &pipelines[0], model.InternalError(fmt.Errorf("found an invalid pipeline config ")) return nil, model.InternalError(
errors.Wrap(err, "found an invalid pipeline config"),
)
} }
return &pipelines[0], nil return &pipelines[0], nil
} }
return nil, model.InternalError(fmt.Errorf("multiple pipelines with same id")) return nil, model.InternalError(fmt.Errorf("multiple pipelines with same id"))
} }
func (r *Repo) DeletePipeline(ctx context.Context, id string) error { func (r *Repo) DeletePipeline(ctx context.Context, id string) error {

View File

@ -10,12 +10,18 @@ import (
"go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/confmap"
model "go.signoz.io/signoz/pkg/query-service/app/opamp/model" model "go.signoz.io/signoz/pkg/query-service/app/opamp/model"
"go.signoz.io/signoz/pkg/query-service/app/opamp/otelconfig" "go.signoz.io/signoz/pkg/query-service/app/opamp/otelconfig"
coreModel "go.signoz.io/signoz/pkg/query-service/model"
"go.uber.org/zap" "go.uber.org/zap"
) )
// inserts or updates ingestion controller processors depending // inserts or updates ingestion controller processors depending
// on the signal (metrics or traces) // on the signal (metrics or traces)
func UpsertControlProcessors(ctx context.Context, signal string, processors map[string]interface{}, callback model.OnChangeCallback) (hash string, fnerr error) { func UpsertControlProcessors(
ctx context.Context,
signal string,
processors map[string]interface{},
callback model.OnChangeCallback,
) (hash string, fnerr *coreModel.ApiError) {
// note: only processors enabled through tracesPipelinePlan will be added // note: only processors enabled through tracesPipelinePlan will be added
// to pipeline. To enable or disable processors from pipeline, call // to pipeline. To enable or disable processors from pipeline, call
// AddToTracePipeline() or RemoveFromTracesPipeline() prior to calling // AddToTracePipeline() or RemoveFromTracesPipeline() prior to calling
@ -25,24 +31,28 @@ func UpsertControlProcessors(ctx context.Context, signal string, processors map[
if signal != string(Metrics) && signal != string(Traces) { if signal != string(Metrics) && signal != string(Traces) {
zap.S().Error("received invalid signal int UpsertControlProcessors", signal) zap.S().Error("received invalid signal int UpsertControlProcessors", signal)
fnerr = fmt.Errorf("signal not supported in ingestion rules: %s", signal) fnerr = coreModel.BadRequest(fmt.Errorf(
"signal not supported in ingestion rules: %s", signal,
))
return return
} }
if opAmpServer == nil { if opAmpServer == nil {
fnerr = fmt.Errorf("opamp server is down, unable to push config to agent at this moment") fnerr = coreModel.UnavailableError(fmt.Errorf(
"opamp server is down, unable to push config to agent at this moment",
))
return return
} }
agents := opAmpServer.agents.GetAllAgents() agents := opAmpServer.agents.GetAllAgents()
if len(agents) == 0 { if len(agents) == 0 {
fnerr = fmt.Errorf("no agents available at the moment") fnerr = coreModel.UnavailableError(fmt.Errorf("no agents available at the moment"))
return return
} }
if len(agents) > 1 && signal == string(Traces) { if len(agents) > 1 && signal == string(Traces) {
zap.S().Debug("found multiple agents. this feature is not supported for traces pipeline (sampling rules)") zap.S().Debug("found multiple agents. this feature is not supported for traces pipeline (sampling rules)")
fnerr = fmt.Errorf("multiple agents not supported in sampling rules") fnerr = coreModel.BadRequest(fmt.Errorf("multiple agents not supported in sampling rules"))
return return
} }

View File

@ -12,37 +12,49 @@ import (
"github.com/open-telemetry/opamp-go/protobufs" "github.com/open-telemetry/opamp-go/protobufs"
model "go.signoz.io/signoz/pkg/query-service/app/opamp/model" model "go.signoz.io/signoz/pkg/query-service/app/opamp/model"
"go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/constants"
coreModel "go.signoz.io/signoz/pkg/query-service/model"
"go.uber.org/zap" "go.uber.org/zap"
) )
var lockLogsPipelineSpec sync.RWMutex var lockLogsPipelineSpec sync.RWMutex
func UpsertLogsParsingProcessor(ctx context.Context, parsingProcessors map[string]interface{}, parsingProcessorsNames []string, callback func(string, string, error)) (string, error) { func UpsertLogsParsingProcessor(
ctx context.Context,
parsingProcessors map[string]interface{},
parsingProcessorsNames []string,
callback func(string, string, error),
) (string, *coreModel.ApiError) {
confHash := "" confHash := ""
if opAmpServer == nil { if opAmpServer == nil {
return confHash, fmt.Errorf("opamp server is down, unable to push config to agent at this moment") return confHash, coreModel.UnavailableError(fmt.Errorf(
"opamp server is down, unable to push config to agent at this moment",
))
} }
agents := opAmpServer.agents.GetAllAgents() agents := opAmpServer.agents.GetAllAgents()
if len(agents) == 0 { if len(agents) == 0 {
return confHash, fmt.Errorf("no agents available at the moment") return confHash, coreModel.UnavailableError(fmt.Errorf(
"no agents available at the moment",
))
} }
for _, agent := range agents { for _, agent := range agents {
config := agent.EffectiveConfig config := agent.EffectiveConfig
c, err := yaml.Parser().Unmarshal([]byte(config)) c, err := yaml.Parser().Unmarshal([]byte(config))
if err != nil { if err != nil {
return confHash, err return confHash, coreModel.BadRequest(err)
} }
buildLogParsingProcessors(c, parsingProcessors) buildLogParsingProcessors(c, parsingProcessors)
p, err := getOtelPipelinFromConfig(c) p, err := getOtelPipelinFromConfig(c)
if err != nil { if err != nil {
return confHash, err return confHash, coreModel.BadRequest(err)
} }
if p.Pipelines.Logs == nil { if p.Pipelines.Logs == nil {
return confHash, fmt.Errorf("logs pipeline doesn't exist") return confHash, coreModel.InternalError(fmt.Errorf(
"logs pipeline doesn't exist",
))
} }
// build the new processor list // build the new processor list
@ -54,19 +66,19 @@ func UpsertLogsParsingProcessor(ctx context.Context, parsingProcessors map[strin
updatedConf, err := yaml.Parser().Marshal(c) updatedConf, err := yaml.Parser().Marshal(c)
if err != nil { if err != nil {
return confHash, err return confHash, coreModel.BadRequest(err)
} }
// zap.S().Infof("sending new config", string(updatedConf)) // zap.S().Infof("sending new config", string(updatedConf))
hash := sha256.New() hash := sha256.New()
_, err = hash.Write(updatedConf) _, err = hash.Write(updatedConf)
if err != nil { if err != nil {
return confHash, err return confHash, coreModel.InternalError(err)
} }
agent.EffectiveConfig = string(updatedConf) agent.EffectiveConfig = string(updatedConf)
err = agent.Upsert() err = agent.Upsert()
if err != nil { if err != nil {
return confHash, err return confHash, coreModel.InternalError(err)
} }
agent.SendToAgent(&protobufs.ServerToAgent{ agent.SendToAgent(&protobufs.ServerToAgent{

View File

@ -7,6 +7,7 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/util/stats" "github.com/prometheus/prometheus/util/stats"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
@ -89,6 +90,34 @@ func InternalError(err error) *ApiError {
} }
} }
func NotFoundError(err error) *ApiError {
return &ApiError{
Typ: ErrorNotFound,
Err: err,
}
}
func UnauthorizedError(err error) *ApiError {
return &ApiError{
Typ: ErrorUnauthorized,
Err: err,
}
}
func UnavailableError(err error) *ApiError {
return &ApiError{
Typ: ErrorUnavailable,
Err: err,
}
}
func WrapApiError(err *ApiError, msg string) *ApiError {
return &ApiError{
Typ: err.Type(),
Err: errors.Wrap(err.ToError(), msg),
}
}
type QueryDataV2 struct { type QueryDataV2 struct {
ResultType parser.ValueType `json:"resultType"` ResultType parser.ValueType `json:"resultType"`
Result parser.Value `json:"result"` Result parser.Value `json:"result"`

View File

@ -149,6 +149,115 @@ func TestLogPipelinesLifecycle(t *testing.T) {
) )
} }
func TestLogPipelinesValidation(t *testing.T) {
testCases := []struct {
Name string
Pipeline logparsingpipeline.PostablePipeline
ExpectedResponseStatusCode int
}{
{
Name: "Valid Pipeline",
Pipeline: logparsingpipeline.PostablePipeline{
OrderId: 1,
Name: "pipeline 1",
Alias: "pipeline1",
Enabled: true,
Filter: "attributes.method == \"GET\"",
Config: []model.PipelineOperator{
{
OrderId: 1,
ID: "add",
Type: "add",
Field: "attributes.test",
Value: "val",
Enabled: true,
Name: "test add",
},
},
},
ExpectedResponseStatusCode: 200,
},
{
Name: "Invalid orderId",
Pipeline: logparsingpipeline.PostablePipeline{
OrderId: 0,
Name: "pipeline 1",
Alias: "pipeline1",
Enabled: true,
Filter: "attributes.method == \"GET\"",
Config: []model.PipelineOperator{
{
OrderId: 1,
ID: "add",
Type: "add",
Field: "attributes.test",
Value: "val",
Enabled: true,
Name: "test add",
},
},
},
ExpectedResponseStatusCode: 400,
},
{
Name: "Invalid filter",
Pipeline: logparsingpipeline.PostablePipeline{
OrderId: 1,
Name: "pipeline 1",
Alias: "pipeline1",
Enabled: true,
Filter: "bad filter",
Config: []model.PipelineOperator{
{
OrderId: 1,
ID: "add",
Type: "add",
Field: "attributes.test",
Value: "val",
Enabled: true,
Name: "test add",
},
},
},
ExpectedResponseStatusCode: 400,
},
{
Name: "Invalid operator field",
Pipeline: logparsingpipeline.PostablePipeline{
OrderId: 1,
Name: "pipeline 1",
Alias: "pipeline1",
Enabled: true,
Filter: "attributes.method == \"GET\"",
Config: []model.PipelineOperator{
{
OrderId: 1,
ID: "add",
Type: "add",
Field: "bad.field",
Value: "val",
Enabled: true,
Name: "test add",
},
},
},
ExpectedResponseStatusCode: 400,
},
}
for _, tc := range testCases {
t.Run(tc.Name, func(t *testing.T) {
testbed := NewLogPipelinesTestBed(t)
testbed.PostPipelinesToQSExpectingStatusCode(
logparsingpipeline.PostablePipelines{
Pipelines: []logparsingpipeline.PostablePipeline{tc.Pipeline},
},
tc.ExpectedResponseStatusCode,
)
})
}
}
// LogPipelinesTestBed coordinates and mocks components involved in // LogPipelinesTestBed coordinates and mocks components involved in
// configuring log pipelines and provides test helpers. // configuring log pipelines and provides test helpers.
type LogPipelinesTestBed struct { type LogPipelinesTestBed struct {
@ -282,7 +391,7 @@ func (tb *LogPipelinesTestBed) GetPipelinesFromQS() *logparsingpipeline.Pipeline
if response.StatusCode != 200 { if response.StatusCode != 200 {
tb.t.Fatalf( tb.t.Fatalf(
"could not list log parsing pipelines. status: %d, body: %v", "could not list log parsing pipelines. status: %d, body: %v",
response.StatusCode, responseBody, response.StatusCode, string(responseBody),
) )
} }
@ -291,7 +400,7 @@ func (tb *LogPipelinesTestBed) GetPipelinesFromQS() *logparsingpipeline.Pipeline
if err != nil { if err != nil {
tb.t.Fatalf( tb.t.Fatalf(
"Could not unmarshal QS response into an ApiResponse.\nResponse body: %s", "Could not unmarshal QS response into an ApiResponse.\nResponse body: %s",
responseBody, string(responseBody),
) )
} }
pipelinesResp, err := unmarshalPipelinesResponse(&result) pipelinesResp, err := unmarshalPipelinesResponse(&result)