Feat: QS: logs pipelines severity parsing processor (#4132)

* chore: update test helper for making logs

* chore: add happy case test for severity parser

* feat: get severity parsing processor working and add more happy path tests

* chore: add test for validating severity parser doesn't spam collector logs

* chore: add if condition to generated severity_parser operators

* chore: add postablePipeline validation for severity parser

* chore: minor cleanup in tests

* chore: allow trace and fatal in severity mappings
This commit is contained in:
Raj Kamal Singh 2023-12-01 17:22:22 +05:30 committed by GitHub
parent 3e29161fea
commit aad44a1037
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 322 additions and 19 deletions

View File

@ -66,6 +66,10 @@ type PipelineOperator struct {
// time_parser fields.
Layout string `json:"layout,omitempty" yaml:"layout,omitempty"`
LayoutType string `json:"layout_type,omitempty" yaml:"layout_type,omitempty"`
// severity parser fields
SeverityMapping map[string][]string `json:"mapping,omitempty" yaml:"mapping,omitempty"`
OverwriteSeverityText bool `json:"overwrite_text,omitempty" yaml:"overwrite_text,omitempty"`
}
type TimestampParser struct {

View File

@ -138,6 +138,16 @@ func getOperators(ops []PipelineOperator) ([]PipelineOperator, error) {
}
// TODO(Raj): Maybe add support for gotime too eventually
} else if operator.Type == "severity_parser" {
parseFromParts := strings.Split(operator.ParseFrom, ".")
parseFromPath := strings.Join(parseFromParts, "?.")
operator.If = fmt.Sprintf(
`%s != nil && ( type(%s) == "string" || ( type(%s) in ["int", "float"] && %s == float(int(%s)) ) )`,
parseFromPath, parseFromPath, parseFromPath, parseFromPath, parseFromPath,
)
}
filteredOp = append(filteredOp, operator)

View File

@ -198,6 +198,18 @@ func isValidOperator(op PipelineOperator) error {
}
}
case "severity_parser":
if op.ParseFrom == "" {
return fmt.Errorf("parse from of severity parsing processor %s cannot be empty", op.ID)
}
validMappingLevels := []string{"trace", "debug", "info", "warn", "error", "fatal"}
for k, _ := range op.SeverityMapping {
if !slices.Contains(validMappingLevels, strings.ToLower(k)) {
return fmt.Errorf("%s is not a valid severity in processor %s", k, op.ID)
}
}
default:
return fmt.Errorf(fmt.Sprintf("operator type %s not supported for %s, use one of (grok_parser, regex_parser, copy, move, add, remove, trace_parser, retain)", op.Type, op.ID))
}

View File

@ -326,6 +326,40 @@ var operatorTest = []struct {
Layout: "%U",
},
IsValid: false,
}, {
Name: "Severity Parser - valid",
Operator: PipelineOperator{
ID: "severity",
Type: "severity_parser",
ParseFrom: "attributes.test_severity",
SeverityMapping: map[string][]string{
"trace": {"test_trace"},
"fatal": {"test_fatal"},
},
OverwriteSeverityText: true,
},
IsValid: true,
}, {
Name: "Severity Parser - Parse from is required",
Operator: PipelineOperator{
ID: "severity",
Type: "severity_parser",
SeverityMapping: map[string][]string{},
OverwriteSeverityText: true,
},
IsValid: false,
}, {
Name: "Severity Parser - mapping level must be valid",
Operator: PipelineOperator{
ID: "severity",
Type: "severity_parser",
ParseFrom: "attributes.test",
SeverityMapping: map[string][]string{
"not-a-level": {"bad-level"},
},
OverwriteSeverityText: true,
},
IsValid: false,
},
}

View File

@ -3,6 +3,7 @@ package logparsingpipeline
import (
"context"
"encoding/json"
"fmt"
"strconv"
"testing"
"time"
@ -91,15 +92,15 @@ func TestPipelinePreview(t *testing.T) {
},
}
matchingLog := makeTestLogEntry(
matchingLog := makeTestSignozLog(
"test log body",
map[string]string{
map[string]interface{}{
"method": "GET",
},
)
nonMatchingLog := makeTestLogEntry(
nonMatchingLog := makeTestSignozLog(
"test log body",
map[string]string{
map[string]interface{}{
"method": "POST",
},
)
@ -184,9 +185,9 @@ func TestGrokParsingProcessor(t *testing.T) {
},
}
testLog := makeTestLogEntry(
testLog := makeTestSignozLog(
"2023-10-26T04:38:00.602Z INFO route/server.go:71 HTTP request received",
map[string]string{
map[string]interface{}{
"method": "GET",
},
)
@ -314,18 +315,39 @@ func TestTraceParsingProcessor(t *testing.T) {
require.Equal("", result[0].SpanID)
}
func makeTestLogEntry(
func makeTestSignozLog(
body string,
attributes map[string]string,
attributes map[string]interface{},
) model.SignozLog {
return model.SignozLog{
Timestamp: uint64(time.Now().UnixNano()),
Body: body,
Attributes_string: attributes,
Resources_string: map[string]string{},
SeverityText: entry.Info.String(),
SeverityNumber: uint8(entry.Info),
SpanID: uuid.New().String(),
TraceID: uuid.New().String(),
testLog := model.SignozLog{
Timestamp: uint64(time.Now().UnixNano()),
Body: body,
Attributes_bool: map[string]bool{},
Attributes_string: map[string]string{},
Attributes_int64: map[string]int64{},
Attributes_float64: map[string]float64{},
Resources_string: map[string]string{},
SeverityText: entry.Info.String(),
SeverityNumber: uint8(entry.Info),
SpanID: uuid.New().String(),
TraceID: uuid.New().String(),
}
for k, v := range attributes {
switch v.(type) {
case bool:
testLog.Attributes_bool[k] = v.(bool)
case string:
testLog.Attributes_string[k] = v.(string)
case int:
testLog.Attributes_int64[k] = int64(v.(int))
case float64:
testLog.Attributes_float64[k] = v.(float64)
default:
panic(fmt.Sprintf("found attribute value of unsupported type %T in test log", v))
}
}
return testLog
}

View File

@ -0,0 +1,221 @@
package logparsingpipeline
import (
"context"
"encoding/json"
"strings"
"testing"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
func TestSeverityParsingProcessor(t *testing.T) {
require := require.New(t)
testPipelines := []Pipeline{
{
OrderId: 1,
Name: "pipeline1",
Alias: "pipeline1",
Enabled: true,
Filter: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "method",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
Operator: "=",
Value: "GET",
},
},
},
Config: []PipelineOperator{},
},
}
var severityParserOp PipelineOperator
err := json.Unmarshal([]byte(`
{
"orderId": 1,
"enabled": true,
"type": "severity_parser",
"name": "Test severity parser",
"id": "test-severity-parser",
"parse_from": "attributes.test_severity",
"mapping": {
"trace": ["test_trace"],
"debug": ["test_debug", "2xx"],
"info": ["test_info", "3xx"],
"warn": ["test_warn", "4xx"],
"error": ["test_error", "5xx"],
"fatal": ["test_fatal"]
},
"overwrite_text": true
}
`), &severityParserOp)
require.Nil(err)
testPipelines[0].Config = append(testPipelines[0].Config, severityParserOp)
testCases := []struct {
severityValues []interface{}
expectedSeverityText string
expectedSeverityNumber uint8
}{
{
severityValues: []interface{}{
"test_trace", "TEST_TRACE", "trace", "Trace",
},
expectedSeverityText: "TRACE",
expectedSeverityNumber: 1,
},
{
severityValues: []interface{}{
"test_debug", "TEST_DEBUG", "debug", "DEBUG", 202.0,
},
expectedSeverityText: "DEBUG",
expectedSeverityNumber: 5,
}, {
severityValues: []interface{}{
"test_info", "TEST_INFO", "info", "INFO", 302.0,
},
expectedSeverityText: "INFO",
expectedSeverityNumber: 9,
}, {
severityValues: []interface{}{
"test_warn", "TEST_WARN", "warn", "WARN", 404.0,
},
expectedSeverityText: "WARN",
expectedSeverityNumber: 13,
}, {
severityValues: []interface{}{
"test_error", "TEST_ERROR", "error", "ERROR", 500.0,
},
expectedSeverityText: "ERROR",
expectedSeverityNumber: 17,
}, {
severityValues: []interface{}{
"test_fatal", "TEST_FATAL", "fatal", "FATAL",
},
expectedSeverityText: "FATAL",
expectedSeverityNumber: 21,
},
}
for _, testCase := range testCases {
inputLogs := []model.SignozLog{}
for _, severityAttribValue := range testCase.severityValues {
inputLogs = append(inputLogs, makeTestSignozLog(
"test log",
map[string]interface{}{
"method": "GET",
"test_severity": severityAttribValue,
},
))
}
result, collectorWarnAndErrorLogs, err := SimulatePipelinesProcessing(
context.Background(),
testPipelines,
inputLogs,
)
require.Nil(err)
require.Equal(len(inputLogs), len(result))
require.Equal(0, len(collectorWarnAndErrorLogs), strings.Join(collectorWarnAndErrorLogs, "\n"))
processed := result[0]
require.Equal(testCase.expectedSeverityNumber, processed.SeverityNumber)
require.Equal(testCase.expectedSeverityText, processed.SeverityText)
}
}
func TestNoCollectorErrorsFromSeverityParserForMismatchedLogs(t *testing.T) {
require := require.New(t)
testPipelineFilter := &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "method",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
Operator: "=",
Value: "GET",
},
},
}
makeTestPipeline := func(config []PipelineOperator) Pipeline {
return Pipeline{
OrderId: 1,
Name: "pipeline1",
Alias: "pipeline1",
Enabled: true,
Filter: testPipelineFilter,
Config: config,
}
}
type pipelineTestCase struct {
Name string
Operator PipelineOperator
NonMatchingLog model.SignozLog
}
testCases := []pipelineTestCase{
{
"severity parser should ignore logs with missing field",
PipelineOperator{
ID: "severity",
Type: "severity_parser",
Enabled: true,
Name: "severity parser",
ParseFrom: "attributes.test_severity",
SeverityMapping: map[string][]string{
"debug": {"debug"},
},
OverwriteSeverityText: true,
},
makeTestSignozLog("mismatching log", map[string]interface{}{
"method": "GET",
}),
}, {
"severity parser should ignore logs with invalid values.",
PipelineOperator{
ID: "severity",
Type: "severity_parser",
Enabled: true,
Name: "severity parser",
ParseFrom: "attributes.test_severity",
SeverityMapping: map[string][]string{
"debug": {"debug"},
},
OverwriteSeverityText: true,
},
makeTestSignozLog("mismatching log", map[string]interface{}{
"method": "GET",
"test_severity": 200.3,
}),
},
}
for _, testCase := range testCases {
testPipelines := []Pipeline{makeTestPipeline([]PipelineOperator{testCase.Operator})}
result, collectorWarnAndErrorLogs, err := SimulatePipelinesProcessing(
context.Background(),
testPipelines,
[]model.SignozLog{testCase.NonMatchingLog},
)
require.Nil(err)
require.Equal(0, len(collectorWarnAndErrorLogs), strings.Join(collectorWarnAndErrorLogs, "\n"))
require.Equal(1, len(result))
}
}

View File

@ -108,9 +108,9 @@ func TestTimestampParsingProcessor(t *testing.T) {
testPipelines[0].Config = append(testPipelines[0].Config, timestampParserOp)
testTimestampStr := "2023-11-27T12:03:28.239907+0530"
testLog := makeTestLogEntry(
testLog := makeTestSignozLog(
"test log",
map[string]string{
map[string]interface{}{
"method": "GET",
"test_timestamp": testTimestampStr,
},