QS: logparsingpipeline previews (#3694)

* chore: rename model.GetLogsResponse -> model.SignozLog for use in both requests and responses

* feat: add test for simulating log pipelines processing

* feat: get pipeline preview tests passing

* chore: cleanup
This commit is contained in:
Raj Kamal Singh 2023-10-09 15:25:13 +05:30 committed by GitHub
parent 503417719c
commit 718eb7b381
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 423 additions and 10 deletions

View File

@ -3581,8 +3581,8 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda
return nil return nil
} }
func (r *ClickHouseReader) GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.GetLogsResponse, *model.ApiError) { func (r *ClickHouseReader) GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.SignozLog, *model.ApiError) {
response := []model.GetLogsResponse{} response := []model.SignozLog{}
fields, apiErr := r.GetLogFields(ctx) fields, apiErr := r.GetLogFields(ctx)
if apiErr != nil { if apiErr != nil {
return nil, apiErr return nil, apiErr
@ -3678,7 +3678,7 @@ func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailC
} }
tmpQuery = fmt.Sprintf("%s order by timestamp desc, id desc limit 100", tmpQuery) tmpQuery = fmt.Sprintf("%s order by timestamp desc, id desc limit 100", tmpQuery)
zap.S().Debug(tmpQuery) zap.S().Debug(tmpQuery)
response := []model.GetLogsResponse{} response := []model.SignozLog{}
err := r.db.Select(ctx, &response, tmpQuery) err := r.db.Select(ctx, &response, tmpQuery)
if err != nil { if err != nil {
zap.S().Error(err) zap.S().Error(err)
@ -4702,7 +4702,7 @@ func (r *ClickHouseReader) LiveTailLogsV3(ctx context.Context, query string, tim
tmpQuery = query + tmpQuery + " order by timestamp desc, id desc limit 100" tmpQuery = query + tmpQuery + " order by timestamp desc, id desc limit 100"
// using the old structure since we can directly read it to the struct as use it. // using the old structure since we can directly read it to the struct as use it.
response := []model.GetLogsResponse{} response := []model.SignozLog{}
err := r.db.Select(ctx, &response, tmpQuery) err := r.db.Select(ctx, &response, tmpQuery)
if err != nil { if err != nil {
zap.S().Error(err) zap.S().Error(err)

View File

@ -2379,7 +2379,7 @@ func (aH *APIHandler) tailLogs(w http.ResponseWriter, r *http.Request) {
} }
// create the client // create the client
client := &model.LogsTailClient{Name: r.RemoteAddr, Logs: make(chan *model.GetLogsResponse, 1000), Done: make(chan *bool), Error: make(chan error), Filter: *params} client := &model.LogsTailClient{Name: r.RemoteAddr, Logs: make(chan *model.SignozLog, 1000), Done: make(chan *bool), Error: make(chan error), Filter: *params}
go aH.reader.TailLogs(r.Context(), client) go aH.reader.TailLogs(r.Context(), client)
w.Header().Set("Connection", "keep-alive") w.Header().Set("Connection", "keep-alive")
@ -3160,7 +3160,7 @@ func (aH *APIHandler) liveTailLogs(w http.ResponseWriter, r *http.Request) {
} }
// create the client // create the client
client := &v3.LogsLiveTailClient{Name: r.RemoteAddr, Logs: make(chan *model.GetLogsResponse, 1000), Done: make(chan *bool), Error: make(chan error)} client := &v3.LogsLiveTailClient{Name: r.RemoteAddr, Logs: make(chan *model.SignozLog, 1000), Done: make(chan *bool), Error: make(chan error)}
go aH.reader.LiveTailLogsV3(r.Context(), queryString, uint64(queryRangeParams.Start), "", client) go aH.reader.LiveTailLogsV3(r.Context(), queryString, uint64(queryRangeParams.Start), "", client)
w.Header().Set("Connection", "keep-alive") w.Header().Set("Connection", "keep-alive")

View File

@ -0,0 +1,254 @@
package logparsingpipeline
import (
"context"
"fmt"
"sort"
"strings"
"time"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/logstransformprocessor"
"github.com/pkg/errors"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/processor"
"go.signoz.io/signoz/pkg/query-service/collectorsimulator"
"go.signoz.io/signoz/pkg/query-service/model"
"gopkg.in/yaml.v3"
)
func SimulatePipelinesProcessing(
ctx context.Context,
pipelines []Pipeline,
logs []model.SignozLog,
) (
[]model.SignozLog, *model.ApiError,
) {
if len(pipelines) < 1 {
return logs, nil
}
// Collector simulation does not guarantee that logs will come
// out in the same order as in the input.
//
// Add a temp attribute for sorting logs in simulation output
inputOrderAttribute := "__signoz_input_idx__"
for i := 0; i < len(logs); i++ {
if logs[i].Attributes_int64 == nil {
logs[i].Attributes_int64 = map[string]int64{}
}
logs[i].Attributes_int64[inputOrderAttribute] = int64(i)
}
simulatorInputPLogs := SignozLogsToPLogs(logs)
// Simulate processing of logs through an otel collector
processorConfigs, err := collectorProcessorsForPipelines(pipelines)
if err != nil {
return nil, model.BadRequest(errors.Wrap(
err, "could not prepare otel processors for pipelines",
))
}
processorFactories, err := processor.MakeFactoryMap(
logstransformprocessor.NewFactory(),
)
if err != nil {
return nil, model.InternalError(errors.Wrap(
err, "could not construct processor factory map",
))
}
// Pipelines translate to logtransformprocessors in otel collector config.
// Each logtransformprocessor (stanza) does its own batching with a flush
// interval of 100ms. So e2e processing time for logs grows linearly with
// the number of logtransformprocessors involved.
// See defaultFlushInterval at https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/stanza/adapter/emitter.go
// TODO(Raj): Remove this after flushInterval is exposed in logtransformprocessor config
timeout := time.Millisecond * time.Duration(len(processorConfigs)*100+100)
outputPLogs, collectorErrs, apiErr := collectorsimulator.SimulateLogsProcessing(
ctx,
processorFactories,
processorConfigs,
simulatorInputPLogs,
timeout,
)
collectorErrsText := strings.Join(collectorErrs, "\n")
if apiErr != nil {
return nil, model.WrapApiError(apiErr, fmt.Sprintf(
"could not simulate log pipelines processing.\nCollector errors: %s\n", collectorErrsText,
))
}
outputSignozLogs := PLogsToSignozLogs(outputPLogs)
// Sort output logs by their order in the input and remove the temp ordering attribute
sort.Slice(outputSignozLogs, func(i, j int) bool {
iIdx := outputSignozLogs[i].Attributes_int64[inputOrderAttribute]
jIdx := outputSignozLogs[j].Attributes_int64[inputOrderAttribute]
return iIdx < jIdx
})
for _, sigLog := range outputSignozLogs {
delete(sigLog.Attributes_int64, inputOrderAttribute)
}
return outputSignozLogs, nil
}
func collectorProcessorsForPipelines(pipelines []Pipeline) (
[]collectorsimulator.ProcessorConfig, error,
) {
processors, procNames, err := PreparePipelineProcessor(pipelines)
if err != nil {
return nil, err
}
processorConfigs := []collectorsimulator.ProcessorConfig{}
for _, procName := range procNames {
// convert `Processor` structs to map[string]interface{}
procYaml, err := yaml.Marshal(processors[procName])
if err != nil {
return nil, errors.Wrap(err, "could not marshal Processor struct")
}
var procConfRaw map[string]interface{}
err = yaml.Unmarshal(procYaml, &procConfRaw)
if err != nil {
return nil, errors.Wrap(err, "could not unmarshal proc yaml")
}
processorConfigs = append(processorConfigs, collectorsimulator.ProcessorConfig{
Name: procName,
Config: procConfRaw,
})
}
return processorConfigs, nil
}
// plog doesn't contain an ID field.
// SignozLog.ID is stored as a log attribute in plogs for processing
// and gets hydrated back later.
const SignozLogIdAttr = "__signoz_log_id__"
func SignozLogsToPLogs(logs []model.SignozLog) []plog.Logs {
result := []plog.Logs{}
for _, log := range logs {
pl := plog.NewLogs()
rl := pl.ResourceLogs().AppendEmpty()
resourceAttribs := rl.Resource().Attributes()
for k, v := range log.Resources_string {
resourceAttribs.PutStr(k, v)
}
scopeLog := rl.ScopeLogs().AppendEmpty()
slRecord := scopeLog.LogRecords().AppendEmpty()
slRecord.SetTimestamp(pcommon.NewTimestampFromTime(
time.Unix(0, int64(log.Timestamp)),
))
var traceIdBuf [16]byte
copy(traceIdBuf[:], []byte(log.TraceID))
slRecord.SetTraceID(traceIdBuf)
var spanIdBuf [8]byte
copy(spanIdBuf[:], []byte(log.SpanID))
slRecord.SetSpanID(spanIdBuf)
slRecord.SetFlags(plog.LogRecordFlags(log.TraceFlags))
slRecord.SetSeverityText(log.SeverityText)
slRecord.SetSeverityNumber(plog.SeverityNumber(log.SeverityNumber))
slRecord.Body().SetStr(log.Body)
slAttribs := slRecord.Attributes()
for k, v := range log.Attributes_int64 {
slAttribs.PutInt(k, v)
}
for k, v := range log.Attributes_float64 {
slAttribs.PutDouble(k, v)
}
for k, v := range log.Attributes_string {
slAttribs.PutStr(k, v)
}
slAttribs.PutStr(SignozLogIdAttr, log.ID)
result = append(result, pl)
}
return result
}
func PLogsToSignozLogs(plogs []plog.Logs) []model.SignozLog {
result := []model.SignozLog{}
for _, pl := range plogs {
resourceLogsSlice := pl.ResourceLogs()
for i := 0; i < resourceLogsSlice.Len(); i++ {
rl := resourceLogsSlice.At(i)
scopeLogsSlice := rl.ScopeLogs()
for j := 0; j < scopeLogsSlice.Len(); j++ {
sl := scopeLogsSlice.At(j)
lrSlice := sl.LogRecords()
for k := 0; k < lrSlice.Len(); k++ {
lr := lrSlice.At(k)
// Recover ID for the log and remove temp attrib used for storing it
signozLogId := ""
logIdVal, exists := lr.Attributes().Get(SignozLogIdAttr)
if exists {
signozLogId = logIdVal.Str()
}
lr.Attributes().Remove(SignozLogIdAttr)
signozLog := model.SignozLog{
Timestamp: uint64(lr.Timestamp()),
ID: signozLogId,
TraceID: lr.TraceID().String(),
SpanID: lr.SpanID().String(),
TraceFlags: uint32(lr.Flags()),
SeverityText: lr.SeverityText(),
SeverityNumber: uint8(lr.SeverityNumber()),
Body: lr.Body().AsString(),
Resources_string: pMapToStrMap(rl.Resource().Attributes()),
Attributes_string: map[string]string{},
Attributes_int64: map[string]int64{},
Attributes_float64: map[string]float64{},
}
// Populate signozLog.Attributes_...
lr.Attributes().Range(func(k string, v pcommon.Value) bool {
if v.Type() == pcommon.ValueTypeDouble {
signozLog.Attributes_float64[k] = v.Double()
} else if v.Type() == pcommon.ValueTypeInt {
signozLog.Attributes_int64[k] = v.Int()
} else {
signozLog.Attributes_string[k] = v.AsString()
}
return true
})
result = append(result, signozLog)
}
}
}
}
return result
}
func pMapToStrMap(pMap pcommon.Map) map[string]string {
result := map[string]string{}
pMap.Range(func(k string, v pcommon.Value) bool {
result[k] = v.AsString()
return true
})
return result
}

View File

@ -0,0 +1,158 @@
package logparsingpipeline
import (
"context"
"testing"
"time"
"github.com/google/uuid"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/stretchr/testify/require"
"go.signoz.io/signoz/pkg/query-service/model"
v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
)
func TestPipelinePreview(t *testing.T) {
require := require.New(t)
testPipelines := []Pipeline{
{
OrderId: 1,
Name: "pipeline1",
Alias: "pipeline1",
Enabled: true,
Filter: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "method",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
Operator: "=",
Value: "GET",
},
},
},
Config: []PipelineOperator{
{
OrderId: 1,
ID: "add",
Type: "add",
Field: "attributes.test",
Value: "val",
Enabled: true,
Name: "test add",
},
},
},
{
OrderId: 2,
Name: "pipeline2",
Alias: "pipeline2",
Enabled: true,
Filter: &v3.FilterSet{
Operator: "AND",
Items: []v3.FilterItem{
{
Key: v3.AttributeKey{
Key: "method",
DataType: v3.AttributeKeyDataTypeString,
Type: v3.AttributeKeyTypeTag,
},
Operator: "=",
Value: "GET",
},
},
},
Config: []PipelineOperator{
{
OrderId: 1,
ID: "add",
Type: "add",
Field: "resource.test1",
Value: "val1",
Enabled: true,
Name: "test add2",
}, {
OrderId: 2,
ID: "add2",
Type: "add",
Field: "resource.test2",
Value: "val2",
Enabled: true,
Name: "test add3",
},
},
},
}
matchingLog := makeTestLogEntry(
"test log body",
map[string]string{
"method": "GET",
},
)
nonMatchingLog := makeTestLogEntry(
"test log body",
map[string]string{
"method": "POST",
},
)
result, err := SimulatePipelinesProcessing(
context.Background(),
testPipelines,
[]model.SignozLog{
matchingLog,
nonMatchingLog,
},
)
require.Nil(err)
require.Equal(2, len(result))
// matching log should have been modified as expected.
require.NotEqual(
matchingLog.Attributes_string,
result[0].Attributes_string,
)
testAttrValue := result[0].Attributes_string["test"]
require.NotNil(testAttrValue)
require.Equal(
testAttrValue, "val",
)
require.Equal(result[0].Resources_string, map[string]string{
"test1": "val1",
"test2": "val2",
})
// non-matching log should not be touched.
require.Equal(
nonMatchingLog.Attributes_string,
result[1].Attributes_string,
)
require.Equal(
nonMatchingLog.Resources_string,
result[1].Resources_string,
)
}
func makeTestLogEntry(
body string,
attributes map[string]string,
) model.SignozLog {
return model.SignozLog{
Timestamp: uint64(time.Now().UnixNano()),
Body: body,
Attributes_string: attributes,
Resources_string: map[string]string{},
SeverityText: entry.Info.String(),
SeverityNumber: uint8(entry.Info),
SpanID: uuid.New().String(),
TraceID: uuid.New().String(),
}
}

View File

@ -81,7 +81,7 @@ type Reader interface {
// Logs // Logs
GetLogFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError) GetLogFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError)
UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError
GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.GetLogsResponse, *model.ApiError) GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.SignozLog, *model.ApiError)
TailLogs(ctx context.Context, client *model.LogsTailClient) TailLogs(ctx context.Context, client *model.LogsTailClient)
AggregateLogs(ctx context.Context, params *model.LogsAggregateParams) (*model.GetLogsAggregatesResponse, *model.ApiError) AggregateLogs(ctx context.Context, params *model.LogsAggregateParams) (*model.GetLogsAggregatesResponse, *model.ApiError)
GetLogAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) GetLogAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error)

View File

@ -538,7 +538,8 @@ type GetFieldsResponse struct {
Interesting []LogField `json:"interesting"` Interesting []LogField `json:"interesting"`
} }
type GetLogsResponse struct { // Represents a log record in query service requests and responses.
type SignozLog struct {
Timestamp uint64 `json:"timestamp" ch:"timestamp"` Timestamp uint64 `json:"timestamp" ch:"timestamp"`
ID string `json:"id" ch:"id"` ID string `json:"id" ch:"id"`
TraceID string `json:"trace_id" ch:"trace_id"` TraceID string `json:"trace_id" ch:"trace_id"`
@ -555,7 +556,7 @@ type GetLogsResponse struct {
type LogsTailClient struct { type LogsTailClient struct {
Name string Name string
Logs chan *GetLogsResponse Logs chan *SignozLog
Done chan *bool Done chan *bool
Error chan error Error chan error
Filter LogsFilterParams Filter LogsFilterParams

View File

@ -625,7 +625,7 @@ type Result struct {
type LogsLiveTailClient struct { type LogsLiveTailClient struct {
Name string Name string
Logs chan *model.GetLogsResponse Logs chan *model.SignozLog
Done chan *bool Done chan *bool
Error chan error Error chan error
} }