Feat: query service: logs pipelines timestamp parsing processor (#4105)

* chore: relocate tests for trace and grok parsing processor

* chore: add test for timestamp parsing processor

* feat: update PipelineOperator model for time parser fields and get tests passing

* chore: add test cases for validating time parser fails silently on mismatched logs

* chore: add helper for generating regex for strptime layouts

* feat: time_parser ignore logs whose parseFrom value doesn't match strptime layout

* chore: escape regex special chars if any in the layout string before preparing regex

* chore: add operator.If on time_parser when using layout type epoch

* chore: finish up with operator.If on time_parser for  layout type

* chore: postable pipeline validation for time parser

* chore: some cleanup

* chore: some more cleanup

* chore: add validation for strptime layouts in postable pipelines

---------

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
This commit is contained in:
Raj Kamal Singh 2023-11-29 18:55:01 +05:30 committed by GitHub
parent 1f0fdfd403
commit 1b6b3c2fdf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 463 additions and 13 deletions

View File

@ -49,11 +49,10 @@ type PipelineOperator struct {
Name string `json:"name,omitempty" yaml:"-"`
// optional keys depending on the type
ParseTo string `json:"parse_to,omitempty" yaml:"parse_to,omitempty"`
Pattern string `json:"pattern,omitempty" yaml:"pattern,omitempty"`
Regex string `json:"regex,omitempty" yaml:"regex,omitempty"`
ParseFrom string `json:"parse_from,omitempty" yaml:"parse_from,omitempty"`
Timestamp *TimestampParser `json:"timestamp,omitempty" yaml:"timestamp,omitempty"`
ParseTo string `json:"parse_to,omitempty" yaml:"parse_to,omitempty"`
Pattern string `json:"pattern,omitempty" yaml:"pattern,omitempty"`
Regex string `json:"regex,omitempty" yaml:"regex,omitempty"`
ParseFrom string `json:"parse_from,omitempty" yaml:"parse_from,omitempty"`
*TraceParser `yaml:",inline,omitempty"`
Field string `json:"field,omitempty" yaml:"field,omitempty"`
Value string `json:"value,omitempty" yaml:"value,omitempty"`
@ -63,6 +62,10 @@ type PipelineOperator struct {
Routes *[]Route `json:"routes,omitempty" yaml:"routes,omitempty"`
Fields []string `json:"fields,omitempty" yaml:"fields,omitempty"`
Default string `json:"default,omitempty" yaml:"default,omitempty"`
// time_parser fields.
Layout string `json:"layout,omitempty" yaml:"layout,omitempty"`
LayoutType string `json:"layout_type,omitempty" yaml:"layout_type,omitempty"`
}
type TimestampParser struct {

View File

@ -25,7 +25,11 @@ func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []s
continue
}
operators := getOperators(v.Config)
operators, err := getOperators(v.Config)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to prepare operators")
}
if len(operators) == 0 {
continue
}
@ -68,7 +72,7 @@ func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []s
return processors, names, nil
}
func getOperators(ops []PipelineOperator) []PipelineOperator {
func getOperators(ops []PipelineOperator) ([]PipelineOperator, error) {
filteredOp := []PipelineOperator{}
for i, operator := range ops {
if operator.Enabled {
@ -106,6 +110,34 @@ func getOperators(ops []PipelineOperator) []PipelineOperator {
} else if operator.Type == "trace_parser" {
cleanTraceParser(&operator)
} else if operator.Type == "time_parser" {
parseFromParts := strings.Split(operator.ParseFrom, ".")
parseFromPath := strings.Join(parseFromParts, "?.")
operator.If = fmt.Sprintf(`%s != nil`, parseFromPath)
if operator.LayoutType == "strptime" {
regex, err := RegexForStrptimeLayout(operator.Layout)
if err != nil {
return nil, fmt.Errorf("could not generate time_parser processor: %w", err)
}
operator.If = fmt.Sprintf(
`%s && %s matches "%s"`, operator.If, parseFromPath, regex,
)
} else if operator.LayoutType == "epoch" {
valueRegex := `^\\s*[0-9]+\\s*$`
if strings.Contains(operator.Layout, ".") {
valueRegex = `^\\s*[0-9]+\\.[0-9]+\\s*$`
}
operator.If = fmt.Sprintf(
`%s && string(%s) matches "%s"`, operator.If, parseFromPath, valueRegex,
)
}
// TODO(Raj): Maybe add support for gotime too eventually
}
filteredOp = append(filteredOp, operator)
@ -113,7 +145,7 @@ func getOperators(ops []PipelineOperator) []PipelineOperator {
filteredOp[len(filteredOp)-1].Output = ""
}
}
return filteredOp
return filteredOp, nil
}
func cleanTraceParser(operator *PipelineOperator) {

View File

@ -2,6 +2,7 @@ package logparsingpipeline
import (
"context"
"fmt"
"strings"
"testing"
"time"
@ -198,7 +199,8 @@ var prepareProcessorTestData = []struct {
func TestPreparePipelineProcessor(t *testing.T) {
for _, test := range prepareProcessorTestData {
Convey(test.Name, t, func() {
res := getOperators(test.Operators)
res, err := getOperators(test.Operators)
So(err, ShouldBeNil)
So(res, ShouldResemble, test.Output)
})
}
@ -256,11 +258,13 @@ func TestNoCollectorErrorsFromProcessorsForMismatchedLogs(t *testing.T) {
}
}
testCases := []struct {
type pipelineTestCase struct {
Name string
Operator PipelineOperator
NonMatchingLog model.SignozLog
}{
}
testCases := []pipelineTestCase{
{
"regex processor should ignore log with missing field",
PipelineOperator{
@ -342,12 +346,82 @@ func TestNoCollectorErrorsFromProcessorsForMismatchedLogs(t *testing.T) {
Field: "attributes.test",
},
makeTestLog("mismatching log", map[string]string{}),
}, {
"time parser should ignore logs with missing field.",
PipelineOperator{
ID: "time",
Type: "time_parser",
Enabled: true,
Name: "time parser",
ParseFrom: "attributes.test_timestamp",
LayoutType: "strptime",
Layout: "%Y-%m-%dT%H:%M:%S.%f%z",
},
makeTestLog("mismatching log", map[string]string{}),
}, {
"time parser should ignore logs timestamp values that don't contain expected strptime layout.",
PipelineOperator{
ID: "time",
Type: "time_parser",
Enabled: true,
Name: "time parser",
ParseFrom: "attributes.test_timestamp",
LayoutType: "strptime",
Layout: "%Y-%m-%dT%H:%M:%S.%f%z",
},
makeTestLog("mismatching log", map[string]string{
"test_timestamp": "2023-11-27T12:03:28A239907+0530",
}),
}, {
"time parser should ignore logs timestamp values that don't contain an epoch",
PipelineOperator{
ID: "time",
Type: "time_parser",
Enabled: true,
Name: "time parser",
ParseFrom: "attributes.test_timestamp",
LayoutType: "epoch",
Layout: "s",
},
makeTestLog("mismatching log", map[string]string{
"test_timestamp": "not-an-epoch",
}),
},
// TODO(Raj): see if there is an error scenario for grok parser.
// TODO(Raj): see if there is an error scenario for trace parser.
// TODO(Raj): see if there is an error scenario for Add operator.
}
// Some more timeparser test cases
epochLayouts := []string{"s", "ms", "us", "ns", "s.ms", "s.us", "s.ns"}
epochTestValues := []string{
"1136214245", "1136214245123", "1136214245123456",
"1136214245123456789", "1136214245.123",
"1136214245.123456", "1136214245.123456789",
}
for _, epochLayout := range epochLayouts {
for _, testValue := range epochTestValues {
testCases = append(testCases, pipelineTestCase{
fmt.Sprintf(
"time parser should ignore log with timestamp value %s that doesn't match layout type %s",
testValue, epochLayout,
),
PipelineOperator{
ID: "time",
Type: "time_parser",
Enabled: true,
Name: "time parser",
ParseFrom: "attributes.test_timestamp",
LayoutType: "epoch",
Layout: epochLayout,
},
makeTestLog("mismatching log", map[string]string{
"test_timestamp": testValue,
}),
})
}
}
for _, testCase := range testCases {
testPipelines := []Pipeline{makeTestPipeline([]PipelineOperator{testCase.Operator})}

View File

@ -8,6 +8,7 @@ import (
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/queryBuilderToExpr"
"golang.org/x/exp/slices"
)
// PostablePipelines are a list of user defined pielines
@ -164,6 +165,39 @@ func isValidOperator(op PipelineOperator) error {
if len(op.Fields) == 0 {
return fmt.Errorf(fmt.Sprintf("fields of %s retain operator cannot be empty", op.ID))
}
case "time_parser":
if op.ParseFrom == "" {
return fmt.Errorf("parse from of time parsing processor %s cannot be empty", op.ID)
}
if op.LayoutType != "epoch" && op.LayoutType != "strptime" {
// TODO(Raj): Maybe add support for gotime format
return fmt.Errorf(
"invalid format type '%s' of time parsing processor %s", op.LayoutType, op.ID,
)
}
if op.Layout == "" {
return fmt.Errorf(fmt.Sprintf("format can not be empty for time parsing processor %s", op.ID))
}
validEpochLayouts := []string{"s", "ms", "us", "ns", "s.ms", "s.us", "s.ns"}
if op.LayoutType == "epoch" && !slices.Contains(validEpochLayouts, op.Layout) {
return fmt.Errorf(
"invalid epoch format '%s' of time parsing processor %s", op.LayoutType, op.ID,
)
}
// TODO(Raj): Add validation for strptime layouts via
// collector simulator maybe.
if op.LayoutType == "strptime" {
_, err := RegexForStrptimeLayout(op.Layout)
if err != nil {
return fmt.Errorf(
"invalid strptime format '%s' of time parsing processor %s: %w", op.LayoutType, op.ID, err,
)
}
}
default:
return fmt.Errorf(fmt.Sprintf("operator type %s not supported for %s, use one of (grok_parser, regex_parser, copy, move, add, remove, trace_parser, retain)", op.Type, op.ID))
}

View File

@ -275,6 +275,57 @@ var operatorTest = []struct {
},
},
IsValid: false,
}, {
Name: "Timestamp Parser - valid",
Operator: PipelineOperator{
ID: "time",
Type: "time_parser",
ParseFrom: "attributes.test_timestamp",
LayoutType: "epoch",
Layout: "s",
},
IsValid: true,
}, {
Name: "Timestamp Parser - invalid - bad parsefrom attribute",
Operator: PipelineOperator{
ID: "time",
Type: "time_parser",
ParseFrom: "timestamp",
LayoutType: "epoch",
Layout: "s",
},
IsValid: false,
}, {
Name: "Timestamp Parser - unsupported layout_type",
Operator: PipelineOperator{
ID: "time",
Type: "time_parser",
ParseFrom: "attributes.test_timestamp",
// TODO(Raj): Maybe add support for gotime format
LayoutType: "gotime",
Layout: "Mon Jan 2 15:04:05 -0700 MST 2006",
},
IsValid: false,
}, {
Name: "Timestamp Parser - invalid epoch layout",
Operator: PipelineOperator{
ID: "time",
Type: "time_parser",
ParseFrom: "attributes.test_timestamp",
LayoutType: "epoch",
Layout: "%Y-%m-%d",
},
IsValid: false,
}, {
Name: "Timestamp Parser - invalid strptime layout",
Operator: PipelineOperator{
ID: "time",
Type: "time_parser",
ParseFrom: "attributes.test_timestamp",
LayoutType: "strptime",
Layout: "%U",
},
IsValid: false,
},
}

View File

@ -145,7 +145,7 @@ func TestPipelinePreview(t *testing.T) {
}
func TestGrokParsingPreview(t *testing.T) {
func TestGrokParsingProcessor(t *testing.T) {
require := require.New(t)
testPipelines := []Pipeline{
@ -207,7 +207,7 @@ func TestGrokParsingPreview(t *testing.T) {
require.Equal("route/server.go:71", processed.Attributes_string["location"])
}
func TestTraceParsingPreview(t *testing.T) {
func TestTraceParsingProcessor(t *testing.T) {
require := require.New(t)
testPipelines := []Pipeline{

View File

@ -0,0 +1,120 @@
package logparsingpipeline
import (
"errors"
"fmt"
"regexp"
"strings"
)
// Regex for strptime format placeholders supported by the time parser.
// Used for defining if conditions on time parsing operators so they do not
// spam collector logs when encountering values that can't be parsed.
//
// Based on ctimeSubstitutes defined in https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/timeutils/internal/ctimefmt/ctimefmt.go#L22
//
// TODO(Raj): Maybe make the expressions tighter.
var ctimeRegex = map[string]string{
// %Y - Year, zero-padded (0001, 0002, ..., 2019, 2020, ..., 9999)
"%Y": "[0-9]{4}",
// %y - Year, last two digits, zero-padded (01, ..., 99)
"%y": "[0-9]{2}",
// %m - Month as a decimal number (01, 02, ..., 12)
"%m": "[0-9]{2}",
// %o - Month as a space-padded number ( 1, 2, ..., 12)
"%o": "_[0-9]",
// %q - Month as a unpadded number (1,2,...,12)
"%q": "[0-9]",
// %b, %h - Abbreviated month name (Jan, Feb, ...)
"%b": "[a-zA-Z]*?",
"%h": "[a-zA-Z]*?",
// %B - Full month name (January, February, ...)
"%B": "[a-zA-Z]*?",
// %d - Day of the month, zero-padded (01, 02, ..., 31)
"%d": "[0-9]{2}",
// %e - Day of the month, space-padded ( 1, 2, ..., 31)
"%e": "_[0-9]",
// %g - Day of the month, unpadded (1,2,...,31)
"%g": "[0-9]",
// %a - Abbreviated weekday name (Sun, Mon, ...)
"%a": "[a-zA-Z]*?",
// %A - Full weekday name (Sunday, Monday, ...)
"%A": "[a-zA-Z]*?",
// %H - Hour (24-hour clock) as a zero-padded decimal number (00, ..., 24)
"%H": "[0-9]{2}",
// %l - Hour (12-hour clock: 0, ..., 12)
"%l": "[0-9]{1-2}",
// %I - Hour (12-hour clock) as a zero-padded decimal number (00, ..., 12)
"%I": "[0-9]{2}",
// %p - Locales equivalent of either AM or PM
"%p": "(AM|PM)",
// %P - Locales equivalent of either am or pm
"%P": "(am|pm)",
// %M - Minute, zero-padded (00, 01, ..., 59)
"%M": "[0-9]{2}",
// %S - Second as a zero-padded decimal number (00, 01, ..., 59)
"%S": "[0-9]{2}",
// %L - Millisecond as a decimal number, zero-padded on the left (000, 001, ..., 999)
"%L": "[0-9]*?",
// %f - Microsecond as a decimal number, zero-padded on the left (000000, ..., 999999)
"%f": "[0-9]*?",
// %s - Nanosecond as a decimal number, zero-padded on the left (000000, ..., 999999)
"%s": "[0-9]*?",
// %Z - Timezone name or abbreviation or empty (UTC, EST, CST)
"%Z": "[a-zA-Z]*?",
// %z - UTC offset in the form ±HHMM[SS[.ffffff]] or empty(+0000, -0400)
"%z": "[-+][0-9]*?",
// Weekday as a decimal number, where 0 is Sunday and 6 is Saturday.
"%w": "[-+][0-9]*?",
"%i": "[-+][0-9]*?",
"%j": "[-+][0-9]{2}:[0-9]{2}",
"%k": "[-+][0-9]{2}:[0-9]{2}:[0-9]{2}",
// %D, %x - Short MM/DD/YY date, equivalent to %m/%d/%y
"%D": "[0-9]{2}/[0-9]{2}/[0-9]{4}",
// %D, %x - Short MM/DD/YY date, equivalent to %m/%d/%y
"%x": "[0-9]{2}/[0-9]{2}/[0-9]{4}",
// %F - Short YYYY-MM-DD date, equivalent to %Y-%m-%d
"%F": "[0-9]{4}-[0-9]{2}-[0-9]{2}",
// %T, %X - ISO 8601 time format (HH:MM:SS), equivalent to %H:%M:%S
"%T": "[0-9]{2}:[0-9]{2}:[0-9]{2}",
// %T, %X - ISO 8601 time format (HH:MM:SS), equivalent to %H:%M:%S
"%X": "[0-9]{2}:[0-9]{2}:[0-9]{2}",
// %r - 12-hour clock time (02:55:02 pm)
"%r": "[0-9]{2}:[0-9]{2}:[0-9]{2} (am|pm)",
// %R - 24-hour HH:MM time, equivalent to %H:%M
"%R": "[0-9]{2}:[0-9]{2}",
// %n - New-line character ('\n')
"%n": "\n",
// %t - Horizontal-tab character ('\t')
"%t": "\t",
// %% - A % sign
"%%": "%",
// %c - Date and time representation (Mon Jan 02 15:04:05 2006)
"%c": "[a-zA-Z]{3} [a-zA-Z]{3} [0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2} [0-9]{4}",
}
func RegexForStrptimeLayout(layout string) (string, error) {
layoutRegex := layout
for _, regexSpecialChar := range []string{
".", "+", "*", "?", "^", "$", "(", ")", "[", "]", "{", "}", "|", `\`,
} {
layoutRegex = strings.ReplaceAll(layoutRegex, regexSpecialChar, `\`+regexSpecialChar)
}
var errs []error
replaceStrptimeDirectiveWithRegex := func(directive string) string {
if regex, ok := ctimeRegex[directive]; ok {
return regex
}
errs = append(errs, errors.New("unsupported ctimefmt directive: "+directive))
return ""
}
strptimeDirectiveRegexp := regexp.MustCompile(`%.`)
layoutRegex = strptimeDirectiveRegexp.ReplaceAllStringFunc(layoutRegex, replaceStrptimeDirectiveWithRegex)
if len(errs) != 0 {
return "", fmt.Errorf("couldn't generate regex for ctime format: %v", errs)
}
return layoutRegex, nil
}

View File

@ -0,0 +1,136 @@
package logparsingpipeline
import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"
"time"
"github.com/antonmedv/expr"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
func TestRegexForStrptimeLayout(t *testing.T) {
require := require.New(t)
var testCases = []struct {
strptimeLayout string
str string
shouldMatch bool
}{
{
strptimeLayout: "%Y-%m-%dT%H:%M:%S.%f%z",
str: "2023-11-26T12:03:28.239907+0530",
shouldMatch: true,
}, {
strptimeLayout: "%d-%m-%Y",
str: "26-11-2023",
shouldMatch: true,
}, {
strptimeLayout: "%d-%m-%Y",
str: "26-11-2023",
shouldMatch: true,
}, {
strptimeLayout: "%d/%m/%y",
str: "11/03/02",
shouldMatch: true,
}, {
strptimeLayout: "%A, %d. %B %Y %I:%M%p",
str: "Tuesday, 21. November 2006 04:30PM11/03/02",
shouldMatch: true,
}, {
strptimeLayout: "%A, %d. %B %Y %I:%M%p",
str: "some random text",
shouldMatch: false,
},
}
for _, test := range testCases {
regex, err := RegexForStrptimeLayout(test.strptimeLayout)
require.Nil(err, test.strptimeLayout)
code := fmt.Sprintf(`"%s" matches "%s"`, test.str, regex)
program, err := expr.Compile(code)
require.Nil(err, test.strptimeLayout)
output, err := expr.Run(program, map[string]string{})
require.Nil(err, test.strptimeLayout)
require.Equal(output, test.shouldMatch, test.strptimeLayout)
}
}
func TestTimestampParsingProcessor(t *testing.T) {
require := require.New(t)
testPipelines := []Pipeline{
{
OrderId: 1,
Name: "pipeline1",
Alias: "pipeline1",
Enabled: true,
Filter: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "method",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
Operator: "=",
Value: "GET",
},
},
},
Config: []PipelineOperator{},
},
}
var timestampParserOp PipelineOperator
err := json.Unmarshal([]byte(`
{
"orderId": 1,
"enabled": true,
"type": "time_parser",
"name": "Test timestamp parser",
"id": "test-timestamp-parser",
"parse_from": "attributes.test_timestamp",
"layout_type": "strptime",
"layout": "%Y-%m-%dT%H:%M:%S.%f%z"
}
`), &timestampParserOp)
require.Nil(err)
testPipelines[0].Config = append(testPipelines[0].Config, timestampParserOp)
testTimestampStr := "2023-11-27T12:03:28.239907+0530"
testLog := makeTestLogEntry(
"test log",
map[string]string{
"method": "GET",
"test_timestamp": testTimestampStr,
},
)
result, collectorWarnAndErrorLogs, err := SimulatePipelinesProcessing(
context.Background(),
testPipelines,
[]model.SignozLog{
testLog,
},
)
require.Nil(err)
require.Equal(1, len(result))
require.Equal(0, len(collectorWarnAndErrorLogs), strings.Join(collectorWarnAndErrorLogs, "\n"))
processed := result[0]
expectedTimestamp, err := time.Parse("2006-01-02T15:04:05.999999-0700", testTimestampStr)
require.Nil(err)
require.Equal(uint64(expectedTimestamp.UnixNano()), processed.Timestamp)
}