mirror of
https://git.mirrors.martin98.com/https://github.com/SigNoz/signoz
synced 2025-08-12 06:29:02 +08:00
Merge pull request #2456 from SigNoz/feat/opamp-logparing
feat: logs parsing pipeline support in opamp
This commit is contained in:
commit
27db1b9080
@ -210,3 +210,21 @@ func UpsertSamplingProcessor(ctx context.Context, version int, config *tsp.Confi
|
|||||||
m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, string(processorConfYaml))
|
m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, string(processorConfYaml))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpsertLogParsingProcessors updates the agent with log parsing processors
|
||||||
|
func UpsertLogParsingProcessor(ctx context.Context, version int, rawPipelineData []byte, config map[string]interface{}, names []string) error {
|
||||||
|
if !atomic.CompareAndSwapUint32(&m.lock, 0, 1) {
|
||||||
|
return fmt.Errorf("agent updater is busy")
|
||||||
|
}
|
||||||
|
defer atomic.StoreUint32(&m.lock, 0)
|
||||||
|
|
||||||
|
// send the changes to opamp.
|
||||||
|
configHash, err := opamp.UpsertLogsParsingProcessor(context.Background(), config, names, m.OnConfigUpdate)
|
||||||
|
if err != nil {
|
||||||
|
zap.S().Errorf("failed to call agent config update for log parsing processor:", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
m.updateDeployStatus(ctx, ElementTypeLogPipelines, version, string(DeployInitiated), "Deployment started", configHash, string(rawPipelineData))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
223
pkg/query-service/app/opamp/logspipeline.go
Normal file
223
pkg/query-service/app/opamp/logspipeline.go
Normal file
@ -0,0 +1,223 @@
|
|||||||
|
package opamp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/knadh/koanf/parsers/yaml"
|
||||||
|
"github.com/open-telemetry/opamp-go/protobufs"
|
||||||
|
model "go.signoz.io/signoz/pkg/query-service/app/opamp/model"
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
var lockLogsPipelineSpec sync.RWMutex
|
||||||
|
|
||||||
|
func UpsertLogsParsingProcessor(ctx context.Context, parsingProcessors map[string]interface{}, parsingProcessorsNames []string, callback func(string, string, error)) (string, error) {
|
||||||
|
confHash := ""
|
||||||
|
if opAmpServer == nil {
|
||||||
|
return confHash, fmt.Errorf("opamp server is down, unable to push config to agent at this moment")
|
||||||
|
}
|
||||||
|
|
||||||
|
agents := opAmpServer.agents.GetAllAgents()
|
||||||
|
if len(agents) == 0 {
|
||||||
|
return confHash, fmt.Errorf("no agents available at the moment")
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, agent := range agents {
|
||||||
|
config := agent.EffectiveConfig
|
||||||
|
c, err := yaml.Parser().Unmarshal([]byte(config))
|
||||||
|
if err != nil {
|
||||||
|
return confHash, err
|
||||||
|
}
|
||||||
|
|
||||||
|
buildLogParsingProcessors(c, parsingProcessors)
|
||||||
|
|
||||||
|
p, err := getOtelPipelinFromConfig(c)
|
||||||
|
if err != nil {
|
||||||
|
return confHash, err
|
||||||
|
}
|
||||||
|
if p.Pipelines.Logs == nil {
|
||||||
|
return confHash, fmt.Errorf("logs pipeline doesn't exist")
|
||||||
|
}
|
||||||
|
|
||||||
|
// build the new processor list
|
||||||
|
updatedProcessorList, _ := buildLogsProcessors(p.Pipelines.Logs.Processors, parsingProcessorsNames)
|
||||||
|
p.Pipelines.Logs.Processors = updatedProcessorList
|
||||||
|
|
||||||
|
// add the new processor to the data ( no checks required as the keys will exists)
|
||||||
|
c["service"].(map[string]interface{})["pipelines"].(map[string]interface{})["logs"] = p.Pipelines.Logs
|
||||||
|
|
||||||
|
updatedConf, err := yaml.Parser().Marshal(c)
|
||||||
|
if err != nil {
|
||||||
|
return confHash, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// zap.S().Infof("sending new config", string(updatedConf))
|
||||||
|
hash := sha256.New()
|
||||||
|
_, err = hash.Write(updatedConf)
|
||||||
|
if err != nil {
|
||||||
|
return confHash, err
|
||||||
|
}
|
||||||
|
agent.EffectiveConfig = string(updatedConf)
|
||||||
|
err = agent.Upsert()
|
||||||
|
if err != nil {
|
||||||
|
return confHash, err
|
||||||
|
}
|
||||||
|
|
||||||
|
agent.SendToAgent(&protobufs.ServerToAgent{
|
||||||
|
RemoteConfig: &protobufs.AgentRemoteConfig{
|
||||||
|
Config: &protobufs.AgentConfigMap{
|
||||||
|
ConfigMap: map[string]*protobufs.AgentConfigFile{
|
||||||
|
"collector.yaml": {
|
||||||
|
Body: updatedConf,
|
||||||
|
ContentType: "application/x-yaml",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ConfigHash: hash.Sum(nil),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
if confHash == "" {
|
||||||
|
confHash = string(hash.Sum(nil))
|
||||||
|
model.ListenToConfigUpdate(agent.ID, confHash, callback)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return confHash, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if the processors already exist
|
||||||
|
// if yes then update the processor.
|
||||||
|
// if something doesn't exists then remove it.
|
||||||
|
func buildLogParsingProcessors(agentConf, parsingProcessors map[string]interface{}) error {
|
||||||
|
agentProcessors := agentConf["processors"].(map[string]interface{})
|
||||||
|
exists := map[string]struct{}{}
|
||||||
|
for key, params := range parsingProcessors {
|
||||||
|
agentProcessors[key] = params
|
||||||
|
exists[key] = struct{}{}
|
||||||
|
}
|
||||||
|
// remove the old unwanted processors
|
||||||
|
for k := range agentProcessors {
|
||||||
|
if _, ok := exists[k]; !ok && strings.HasPrefix(k, constants.LogsPPLPfx) {
|
||||||
|
delete(agentProcessors, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
agentConf["processors"] = agentProcessors
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type otelPipeline struct {
|
||||||
|
Pipelines struct {
|
||||||
|
Logs *struct {
|
||||||
|
Exporters []string `json:"exporters" yaml:"exporters"`
|
||||||
|
Processors []string `json:"processors" yaml:"processors"`
|
||||||
|
Receivers []string `json:"receivers" yaml:"receivers"`
|
||||||
|
} `json:"logs" yaml:"logs"`
|
||||||
|
} `json:"pipelines" yaml:"pipelines"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func getOtelPipelinFromConfig(config map[string]interface{}) (*otelPipeline, error) {
|
||||||
|
if _, ok := config["service"]; !ok {
|
||||||
|
return nil, fmt.Errorf("service not found in OTEL config")
|
||||||
|
}
|
||||||
|
b, err := json.Marshal(config["service"])
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
p := otelPipeline{}
|
||||||
|
if err := json.Unmarshal(b, &p); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &p, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildLogsProcessors(current []string, logsParserPipeline []string) ([]string, error) {
|
||||||
|
lockLogsPipelineSpec.Lock()
|
||||||
|
defer lockLogsPipelineSpec.Unlock()
|
||||||
|
|
||||||
|
exists := map[string]struct{}{}
|
||||||
|
for _, v := range logsParserPipeline {
|
||||||
|
exists[v] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// removed the old processors which are not used
|
||||||
|
var pipeline []string
|
||||||
|
for _, v := range current {
|
||||||
|
k := v
|
||||||
|
if _, ok := exists[k]; ok || !strings.HasPrefix(k, constants.LogsPPLPfx) {
|
||||||
|
pipeline = append(pipeline, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// create a reverse map of existing config processors and their position
|
||||||
|
existing := map[string]int{}
|
||||||
|
for i, p := range pipeline {
|
||||||
|
name := p
|
||||||
|
existing[name] = i
|
||||||
|
}
|
||||||
|
|
||||||
|
// create mapping from our logsParserPipeline to position in existing processors (from current config)
|
||||||
|
// this means, if "batch" holds position 3 in the current effective config, and 2 in our config, the map will be [2]: 3
|
||||||
|
specVsExistingMap := map[int]int{}
|
||||||
|
existingVsSpec := map[int]int{}
|
||||||
|
|
||||||
|
// go through plan and map its elements to current positions in effective config
|
||||||
|
for i, m := range logsParserPipeline {
|
||||||
|
if loc, ok := existing[m]; ok {
|
||||||
|
specVsExistingMap[i] = loc
|
||||||
|
existingVsSpec[loc] = i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
lastMatched := 0
|
||||||
|
newPipeline := []string{}
|
||||||
|
|
||||||
|
for i := 0; i < len(logsParserPipeline); i++ {
|
||||||
|
m := logsParserPipeline[i]
|
||||||
|
if loc, ok := specVsExistingMap[i]; ok {
|
||||||
|
for j := lastMatched; j < loc; j++ {
|
||||||
|
if strings.HasPrefix(pipeline[j], constants.LogsPPLPfx) {
|
||||||
|
delete(specVsExistingMap, existingVsSpec[j])
|
||||||
|
} else {
|
||||||
|
newPipeline = append(newPipeline, pipeline[j])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
newPipeline = append(newPipeline, pipeline[loc])
|
||||||
|
lastMatched = loc + 1
|
||||||
|
} else {
|
||||||
|
newPipeline = append(newPipeline, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
if lastMatched < len(pipeline) {
|
||||||
|
newPipeline = append(newPipeline, pipeline[lastMatched:]...)
|
||||||
|
}
|
||||||
|
|
||||||
|
if checkDuplicateString(newPipeline) {
|
||||||
|
// duplicates are most likely because the processor sequence in effective config conflicts
|
||||||
|
// with the planned sequence as per planned pipeline
|
||||||
|
return pipeline, fmt.Errorf("the effective config has an unexpected processor sequence: %v", pipeline)
|
||||||
|
}
|
||||||
|
|
||||||
|
return newPipeline, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkDuplicateString(pipeline []string) bool {
|
||||||
|
exists := make(map[string]bool, len(pipeline))
|
||||||
|
zap.S().Debugf("checking duplicate processors in the pipeline:", pipeline)
|
||||||
|
for _, processor := range pipeline {
|
||||||
|
name := processor
|
||||||
|
if _, ok := exists[name]; ok {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
exists[name] = true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
206
pkg/query-service/app/opamp/logspipeline_test.go
Normal file
206
pkg/query-service/app/opamp/logspipeline_test.go
Normal file
@ -0,0 +1,206 @@
|
|||||||
|
package opamp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
. "github.com/smartystreets/goconvey/convey"
|
||||||
|
"go.signoz.io/signoz/pkg/query-service/constants"
|
||||||
|
)
|
||||||
|
|
||||||
|
var buildProcessorTestData = []struct {
|
||||||
|
Name string
|
||||||
|
agentConf map[string]interface{}
|
||||||
|
pipelineProcessor map[string]interface{}
|
||||||
|
outputConf map[string]interface{}
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
Name: "Add",
|
||||||
|
agentConf: map[string]interface{}{
|
||||||
|
"processors": map[string]interface{}{
|
||||||
|
"batch": struct{}{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
pipelineProcessor: map[string]interface{}{
|
||||||
|
constants.LogsPPLPfx + "b": struct{}{},
|
||||||
|
},
|
||||||
|
outputConf: map[string]interface{}{
|
||||||
|
"processors": map[string]interface{}{
|
||||||
|
constants.LogsPPLPfx + "b": struct{}{},
|
||||||
|
"batch": struct{}{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Remove",
|
||||||
|
agentConf: map[string]interface{}{
|
||||||
|
"processors": map[string]interface{}{
|
||||||
|
constants.LogsPPLPfx + "b": struct{}{},
|
||||||
|
"batch": struct{}{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
pipelineProcessor: map[string]interface{}{},
|
||||||
|
outputConf: map[string]interface{}{
|
||||||
|
"processors": map[string]interface{}{
|
||||||
|
"batch": struct{}{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "remove and upsert 1",
|
||||||
|
agentConf: map[string]interface{}{
|
||||||
|
"processors": map[string]interface{}{
|
||||||
|
constants.LogsPPLPfx + "a": struct{}{},
|
||||||
|
constants.LogsPPLPfx + "b": struct{}{},
|
||||||
|
"batch": struct{}{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
pipelineProcessor: map[string]interface{}{
|
||||||
|
constants.LogsPPLPfx + "b": struct{}{},
|
||||||
|
},
|
||||||
|
outputConf: map[string]interface{}{
|
||||||
|
"processors": map[string]interface{}{
|
||||||
|
constants.LogsPPLPfx + "b": struct{}{},
|
||||||
|
"batch": struct{}{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "remove and upsert 2",
|
||||||
|
agentConf: map[string]interface{}{
|
||||||
|
"processors": map[string]interface{}{
|
||||||
|
"memorylimiter": struct{}{},
|
||||||
|
constants.LogsPPLPfx + "a": struct{}{},
|
||||||
|
constants.LogsPPLPfx + "b": struct{}{},
|
||||||
|
"batch": struct{}{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
pipelineProcessor: map[string]interface{}{
|
||||||
|
constants.LogsPPLPfx + "b": struct{}{},
|
||||||
|
},
|
||||||
|
outputConf: map[string]interface{}{
|
||||||
|
"processors": map[string]interface{}{
|
||||||
|
"memorylimiter": struct{}{},
|
||||||
|
constants.LogsPPLPfx + "b": struct{}{},
|
||||||
|
"batch": struct{}{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildLogParsingProcessors(t *testing.T) {
|
||||||
|
for _, test := range buildProcessorTestData {
|
||||||
|
Convey(test.Name, t, func() {
|
||||||
|
err := buildLogParsingProcessors(test.agentConf, test.pipelineProcessor)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
So(test.agentConf, ShouldResemble, test.outputConf)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
var BuildLogsPipelineTestData = []struct {
|
||||||
|
Name string
|
||||||
|
currentPipeline []string
|
||||||
|
logsPipeline []string
|
||||||
|
expectedPipeline []string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
Name: "Add new pipelines",
|
||||||
|
currentPipeline: []string{"processor1", "processor2"},
|
||||||
|
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b"},
|
||||||
|
expectedPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Add new pipeline and respect custom processors",
|
||||||
|
currentPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"},
|
||||||
|
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"},
|
||||||
|
expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "processor2"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Add new pipeline and respect custom processors",
|
||||||
|
currentPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", "processor2"},
|
||||||
|
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d"},
|
||||||
|
expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "d", "processor2"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Add new pipeline and respect custom processors in the beginning and middle",
|
||||||
|
currentPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "batch"},
|
||||||
|
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c"},
|
||||||
|
expectedPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", constants.LogsPPLPfx + "c", "batch"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Remove old pipeline add add new",
|
||||||
|
currentPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "b", "processor1", "processor2"},
|
||||||
|
logsPipeline: []string{constants.LogsPPLPfx + "a"},
|
||||||
|
expectedPipeline: []string{constants.LogsPPLPfx + "a", "processor1", "processor2"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Remove old pipeline from middle",
|
||||||
|
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"},
|
||||||
|
logsPipeline: []string{constants.LogsPPLPfx + "a"},
|
||||||
|
expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", "batch"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Remove old pipeline from middle and add new pipeline",
|
||||||
|
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", "processor3", constants.LogsPPLPfx + "b", "batch"},
|
||||||
|
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c"},
|
||||||
|
expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "c", "processor3", "batch"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Remove multiple old pipelines from middle and add multiple new ones",
|
||||||
|
currentPipeline: []string{"processor1", constants.LogsPPLPfx + "a", "processor2", constants.LogsPPLPfx + "b", "processor3", constants.LogsPPLPfx + "c", "processor4", constants.LogsPPLPfx + "d", "processor5", "batch"},
|
||||||
|
logsPipeline: []string{constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1"},
|
||||||
|
expectedPipeline: []string{"processor1", constants.LogsPPLPfx + "a", constants.LogsPPLPfx + "a1", "processor2", "processor3", constants.LogsPPLPfx + "c", constants.LogsPPLPfx + "c1", "processor4", "processor5", "batch"},
|
||||||
|
},
|
||||||
|
|
||||||
|
// working
|
||||||
|
{
|
||||||
|
Name: "rearrange pipelines",
|
||||||
|
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
|
||||||
|
logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a"},
|
||||||
|
expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "rearrange pipelines with new processor",
|
||||||
|
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
|
||||||
|
logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c"},
|
||||||
|
expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c", "batch"},
|
||||||
|
// expectedPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_b", "processor3", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_c", "batch"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "delete processor",
|
||||||
|
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch"},
|
||||||
|
logsPipeline: []string{},
|
||||||
|
expectedPipeline: []string{"processor1", "processor2", "processor3", "batch"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "last to first",
|
||||||
|
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", "processor4", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c"},
|
||||||
|
logsPipeline: []string{constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"},
|
||||||
|
expectedPipeline: []string{"processor1", "processor2", "processor3", "processor4", "batch", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_b"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "multiple rearrange pipelines",
|
||||||
|
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
|
||||||
|
logsPipeline: []string{constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"},
|
||||||
|
expectedPipeline: []string{"processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch", "processor4", "processor5", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "processor6", "processor7"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "multiple rearrange with new pipelines",
|
||||||
|
currentPipeline: []string{"processor1", "processor2", constants.LogsPPLPfx + "_a", "processor3", constants.LogsPPLPfx + "_b", "batch", constants.LogsPPLPfx + "_c", "processor4", "processor5", constants.LogsPPLPfx + "_d", "processor6", "processor7"},
|
||||||
|
logsPipeline: []string{constants.LogsPPLPfx + "_z", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e"},
|
||||||
|
expectedPipeline: []string{constants.LogsPPLPfx + "_z", "processor1", "processor2", "processor3", constants.LogsPPLPfx + "_b", constants.LogsPPLPfx + "_a", "batch", "processor4", "processor5", constants.LogsPPLPfx + "_d", constants.LogsPPLPfx + "_c", constants.LogsPPLPfx + "_e", "processor6", "processor7"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildLogsPipeline(t *testing.T) {
|
||||||
|
for _, test := range BuildLogsPipelineTestData {
|
||||||
|
Convey(test.Name, t, func() {
|
||||||
|
v, err := buildLogsProcessors(test.currentPipeline, test.logsPipeline)
|
||||||
|
So(err, ShouldBeNil)
|
||||||
|
fmt.Println(test.Name, "\n", test.currentPipeline, "\n", v, "\n", test.expectedPipeline)
|
||||||
|
So(v, ShouldResemble, test.expectedPipeline)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
@ -2,15 +2,10 @@ package opamp
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/sha256"
|
|
||||||
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/knadh/koanf/parsers/yaml"
|
|
||||||
"github.com/open-telemetry/opamp-go/protobufs"
|
"github.com/open-telemetry/opamp-go/protobufs"
|
||||||
"github.com/open-telemetry/opamp-go/server"
|
"github.com/open-telemetry/opamp-go/server"
|
||||||
"github.com/open-telemetry/opamp-go/server/types"
|
"github.com/open-telemetry/opamp-go/server/types"
|
||||||
"go.opentelemetry.io/collector/confmap"
|
|
||||||
model "go.signoz.io/signoz/pkg/query-service/app/opamp/model"
|
model "go.signoz.io/signoz/pkg/query-service/app/opamp/model"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
@ -112,93 +107,3 @@ func Ready() bool {
|
|||||||
func Subscribe(agentId string, hash string, f model.OnChangeCallback) {
|
func Subscribe(agentId string, hash string, f model.OnChangeCallback) {
|
||||||
model.ListenToConfigUpdate(agentId, hash, f)
|
model.ListenToConfigUpdate(agentId, hash, f)
|
||||||
}
|
}
|
||||||
|
|
||||||
func UpsertProcessor(ctx context.Context, processors map[string]interface{}, names []interface{}) error {
|
|
||||||
x := map[string]interface{}{
|
|
||||||
"processors": processors,
|
|
||||||
}
|
|
||||||
|
|
||||||
newConf := confmap.NewFromStringMap(x)
|
|
||||||
|
|
||||||
agents := opAmpServer.agents.GetAllAgents()
|
|
||||||
for _, agent := range agents {
|
|
||||||
config := agent.EffectiveConfig
|
|
||||||
c, err := yaml.Parser().Unmarshal([]byte(config))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
agentConf := confmap.NewFromStringMap(c)
|
|
||||||
|
|
||||||
err = agentConf.Merge(newConf)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
service := agentConf.Get("service")
|
|
||||||
|
|
||||||
logs := service.(map[string]interface{})["pipelines"].(map[string]interface{})["logs"]
|
|
||||||
processors := logs.(map[string]interface{})["processors"].([]interface{})
|
|
||||||
userProcessors := []interface{}{}
|
|
||||||
// remove old ones
|
|
||||||
for _, v := range processors {
|
|
||||||
if !strings.HasPrefix(v.(string), "logstransform/pipeline_") {
|
|
||||||
userProcessors = append(userProcessors, v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// all user processors are pushed after pipelines
|
|
||||||
processors = append(names, userProcessors...)
|
|
||||||
|
|
||||||
service.(map[string]interface{})["pipelines"].(map[string]interface{})["logs"].(map[string]interface{})["processors"] = processors
|
|
||||||
|
|
||||||
s := map[string]interface{}{
|
|
||||||
"service": map[string]interface{}{
|
|
||||||
"pipelines": map[string]interface{}{
|
|
||||||
"logs": map[string]interface{}{
|
|
||||||
"processors": processors,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
serviceC := confmap.NewFromStringMap(s)
|
|
||||||
|
|
||||||
err = agentConf.Merge(serviceC)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// ------ complete adding processor
|
|
||||||
configR, err := yaml.Parser().Marshal(agentConf.ToStringMap())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
zap.S().Infof("sending new config", string(configR))
|
|
||||||
hash := sha256.New()
|
|
||||||
_, err = hash.Write(configR)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
agent.EffectiveConfig = string(configR)
|
|
||||||
err = agent.Upsert()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
agent.SendToAgent(&protobufs.ServerToAgent{
|
|
||||||
RemoteConfig: &protobufs.AgentRemoteConfig{
|
|
||||||
Config: &protobufs.AgentConfigMap{
|
|
||||||
ConfigMap: map[string]*protobufs.AgentConfigFile{
|
|
||||||
"collector.yaml": {
|
|
||||||
Body: configR,
|
|
||||||
ContentType: "application/x-yaml",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
ConfigHash: hash.Sum(nil),
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
@ -220,3 +220,6 @@ const (
|
|||||||
NumberTagMapCol = "numberTagMap"
|
NumberTagMapCol = "numberTagMap"
|
||||||
BoolTagMapCol = "boolTagMap"
|
BoolTagMapCol = "boolTagMap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// logsPPLPfx is a short constant for logsPipelinePrefix
|
||||||
|
const LogsPPLPfx = "logstransform/pipeline_"
|
||||||
|
95
pkg/query-service/model/logparsingpipeline.go
Normal file
95
pkg/query-service/model/logparsingpipeline.go
Normal file
@ -0,0 +1,95 @@
|
|||||||
|
package model
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Pipeline is stored and also deployed finally to collector config
|
||||||
|
type Pipeline struct {
|
||||||
|
Id string `json:"id,omitempty" db:"id"`
|
||||||
|
OrderId int `json:"orderId" db:"order_id"`
|
||||||
|
Name string `json:"name,omitempty" db:"name"`
|
||||||
|
Alias string `json:"alias" db:"alias"`
|
||||||
|
Description *string `json:"description" db:"description"`
|
||||||
|
Enabled bool `json:"enabled" db:"enabled"`
|
||||||
|
Filter string `json:"filter" db:"filter"`
|
||||||
|
|
||||||
|
// configuration for pipeline
|
||||||
|
RawConfig string `db:"config_json" json:"-"`
|
||||||
|
|
||||||
|
Config []PipelineOperator `json:"config"`
|
||||||
|
|
||||||
|
// Updater not required as any change will result in new version
|
||||||
|
Creator
|
||||||
|
}
|
||||||
|
|
||||||
|
type Creator struct {
|
||||||
|
CreatedBy string `json:"createdBy" db:"created_by"`
|
||||||
|
CreatedAt time.Time `json:"createdAt" db:"created_at"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Processor struct {
|
||||||
|
Operators []PipelineOperator `json:"operators" yaml:"operators"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type PipelineOperator struct {
|
||||||
|
Type string `json:"type" yaml:"type"`
|
||||||
|
ID string `json:"id,omitempty" yaml:"id,omitempty"`
|
||||||
|
Output string `json:"output,omitempty" yaml:"output,omitempty"`
|
||||||
|
OnError string `json:"on_error,omitempty" yaml:"on_error,omitempty"`
|
||||||
|
|
||||||
|
// don't need the following in the final config
|
||||||
|
OrderId int `json:"orderId" yaml:"-"`
|
||||||
|
Enabled bool `json:"enabled" yaml:"-"`
|
||||||
|
Name string `json:"name,omitempty" yaml:"-"`
|
||||||
|
|
||||||
|
// optional keys depending on the type
|
||||||
|
ParseTo string `json:"parse_to,omitempty" yaml:"parse_to,omitempty"`
|
||||||
|
Pattern string `json:"pattern,omitempty" yaml:"pattern,omitempty"`
|
||||||
|
Regex string `json:"regex,omitempty" yaml:"regex,omitempty"`
|
||||||
|
ParseFrom string `json:"parse_from,omitempty" yaml:"parse_from,omitempty"`
|
||||||
|
Timestamp *TimestampParser `json:"timestamp,omitempty" yaml:"timestamp,omitempty"`
|
||||||
|
TraceParser *TraceParser `json:"trace_parser,omitempty" yaml:"trace_parser,omitempty"`
|
||||||
|
Field string `json:"field,omitempty" yaml:"field,omitempty"`
|
||||||
|
Value string `json:"value,omitempty" yaml:"value,omitempty"`
|
||||||
|
From string `json:"from,omitempty" yaml:"from,omitempty"`
|
||||||
|
To string `json:"to,omitempty" yaml:"to,omitempty"`
|
||||||
|
Expr string `json:"expr,omitempty" yaml:"expr,omitempty"`
|
||||||
|
Routes *[]Route `json:"routes,omitempty" yaml:"routes,omitempty"`
|
||||||
|
Fields []string `json:"fields,omitempty" yaml:"fields,omitempty"`
|
||||||
|
Default string `json:"default,omitempty" yaml:"default,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type TimestampParser struct {
|
||||||
|
Layout string `json:"layout" yaml:"layout"`
|
||||||
|
LayoutType string `json:"layout_type" yaml:"layout_type"`
|
||||||
|
ParseFrom string `json:"parse_from" yaml:"parse_from"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type TraceParser struct {
|
||||||
|
TraceId *ParseFrom `json:"trace_id,omitempty" yaml:"trace_id,omitempty"`
|
||||||
|
SpanId *ParseFrom `json:"span_id,omitempty" yaml:"span_id,omitempty"`
|
||||||
|
TraceFlags *ParseFrom `json:"trace_flags,omitempty" yaml:"trace_flags,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ParseFrom struct {
|
||||||
|
ParseFrom string `json:"parse_from" yaml:"parse_from"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Route struct {
|
||||||
|
Output string `json:"output" yaml:"output"`
|
||||||
|
Expr string `json:"expr" yaml:"expr"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *Pipeline) ParseRawConfig() error {
|
||||||
|
c := []PipelineOperator{}
|
||||||
|
err := json.Unmarshal([]byte(i.RawConfig), &c)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "failed to parse ingestion rule config")
|
||||||
|
}
|
||||||
|
i.Config = c
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user