diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 4625aa226b..ffea2a9b7e 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -3581,8 +3581,8 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda return nil } -func (r *ClickHouseReader) GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.GetLogsResponse, *model.ApiError) { - response := []model.GetLogsResponse{} +func (r *ClickHouseReader) GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.SignozLog, *model.ApiError) { + response := []model.SignozLog{} fields, apiErr := r.GetLogFields(ctx) if apiErr != nil { return nil, apiErr @@ -3678,7 +3678,7 @@ func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailC } tmpQuery = fmt.Sprintf("%s order by timestamp desc, id desc limit 100", tmpQuery) zap.S().Debug(tmpQuery) - response := []model.GetLogsResponse{} + response := []model.SignozLog{} err := r.db.Select(ctx, &response, tmpQuery) if err != nil { zap.S().Error(err) @@ -4702,7 +4702,7 @@ func (r *ClickHouseReader) LiveTailLogsV3(ctx context.Context, query string, tim tmpQuery = query + tmpQuery + " order by timestamp desc, id desc limit 100" // using the old structure since we can directly read it to the struct as use it. - response := []model.GetLogsResponse{} + response := []model.SignozLog{} err := r.db.Select(ctx, &response, tmpQuery) if err != nil { zap.S().Error(err) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 0729338158..9045967077 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -2379,7 +2379,7 @@ func (aH *APIHandler) tailLogs(w http.ResponseWriter, r *http.Request) { } // create the client - client := &model.LogsTailClient{Name: r.RemoteAddr, Logs: make(chan *model.GetLogsResponse, 1000), Done: make(chan *bool), Error: make(chan error), Filter: *params} + client := &model.LogsTailClient{Name: r.RemoteAddr, Logs: make(chan *model.SignozLog, 1000), Done: make(chan *bool), Error: make(chan error), Filter: *params} go aH.reader.TailLogs(r.Context(), client) w.Header().Set("Connection", "keep-alive") @@ -3160,7 +3160,7 @@ func (aH *APIHandler) liveTailLogs(w http.ResponseWriter, r *http.Request) { } // create the client - client := &v3.LogsLiveTailClient{Name: r.RemoteAddr, Logs: make(chan *model.GetLogsResponse, 1000), Done: make(chan *bool), Error: make(chan error)} + client := &v3.LogsLiveTailClient{Name: r.RemoteAddr, Logs: make(chan *model.SignozLog, 1000), Done: make(chan *bool), Error: make(chan error)} go aH.reader.LiveTailLogsV3(r.Context(), queryString, uint64(queryRangeParams.Start), "", client) w.Header().Set("Connection", "keep-alive") diff --git a/pkg/query-service/app/logparsingpipeline/preview.go b/pkg/query-service/app/logparsingpipeline/preview.go new file mode 100644 index 0000000000..9ce6839b00 --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/preview.go @@ -0,0 +1,254 @@ +package logparsingpipeline + +import ( + "context" + "fmt" + "sort" + "strings" + "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/logstransformprocessor" + "github.com/pkg/errors" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/processor" + "go.signoz.io/signoz/pkg/query-service/collectorsimulator" + "go.signoz.io/signoz/pkg/query-service/model" + "gopkg.in/yaml.v3" +) + +func SimulatePipelinesProcessing( + ctx context.Context, + pipelines []Pipeline, + logs []model.SignozLog, +) ( + []model.SignozLog, *model.ApiError, +) { + + if len(pipelines) < 1 { + return logs, nil + } + + // Collector simulation does not guarantee that logs will come + // out in the same order as in the input. + // + // Add a temp attribute for sorting logs in simulation output + inputOrderAttribute := "__signoz_input_idx__" + for i := 0; i < len(logs); i++ { + if logs[i].Attributes_int64 == nil { + logs[i].Attributes_int64 = map[string]int64{} + } + logs[i].Attributes_int64[inputOrderAttribute] = int64(i) + } + simulatorInputPLogs := SignozLogsToPLogs(logs) + + // Simulate processing of logs through an otel collector + processorConfigs, err := collectorProcessorsForPipelines(pipelines) + if err != nil { + return nil, model.BadRequest(errors.Wrap( + err, "could not prepare otel processors for pipelines", + )) + } + + processorFactories, err := processor.MakeFactoryMap( + logstransformprocessor.NewFactory(), + ) + if err != nil { + return nil, model.InternalError(errors.Wrap( + err, "could not construct processor factory map", + )) + } + + // Pipelines translate to logtransformprocessors in otel collector config. + // Each logtransformprocessor (stanza) does its own batching with a flush + // interval of 100ms. So e2e processing time for logs grows linearly with + // the number of logtransformprocessors involved. + // See defaultFlushInterval at https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/stanza/adapter/emitter.go + // TODO(Raj): Remove this after flushInterval is exposed in logtransformprocessor config + timeout := time.Millisecond * time.Duration(len(processorConfigs)*100+100) + + outputPLogs, collectorErrs, apiErr := collectorsimulator.SimulateLogsProcessing( + ctx, + processorFactories, + processorConfigs, + simulatorInputPLogs, + timeout, + ) + collectorErrsText := strings.Join(collectorErrs, "\n") + if apiErr != nil { + return nil, model.WrapApiError(apiErr, fmt.Sprintf( + "could not simulate log pipelines processing.\nCollector errors: %s\n", collectorErrsText, + )) + } + + outputSignozLogs := PLogsToSignozLogs(outputPLogs) + + // Sort output logs by their order in the input and remove the temp ordering attribute + sort.Slice(outputSignozLogs, func(i, j int) bool { + iIdx := outputSignozLogs[i].Attributes_int64[inputOrderAttribute] + jIdx := outputSignozLogs[j].Attributes_int64[inputOrderAttribute] + return iIdx < jIdx + }) + for _, sigLog := range outputSignozLogs { + delete(sigLog.Attributes_int64, inputOrderAttribute) + } + + return outputSignozLogs, nil +} + +func collectorProcessorsForPipelines(pipelines []Pipeline) ( + []collectorsimulator.ProcessorConfig, error, +) { + processors, procNames, err := PreparePipelineProcessor(pipelines) + if err != nil { + return nil, err + } + + processorConfigs := []collectorsimulator.ProcessorConfig{} + for _, procName := range procNames { + // convert `Processor` structs to map[string]interface{} + procYaml, err := yaml.Marshal(processors[procName]) + if err != nil { + return nil, errors.Wrap(err, "could not marshal Processor struct") + } + var procConfRaw map[string]interface{} + err = yaml.Unmarshal(procYaml, &procConfRaw) + if err != nil { + return nil, errors.Wrap(err, "could not unmarshal proc yaml") + } + + processorConfigs = append(processorConfigs, collectorsimulator.ProcessorConfig{ + Name: procName, + Config: procConfRaw, + }) + } + + return processorConfigs, nil +} + +// plog doesn't contain an ID field. +// SignozLog.ID is stored as a log attribute in plogs for processing +// and gets hydrated back later. +const SignozLogIdAttr = "__signoz_log_id__" + +func SignozLogsToPLogs(logs []model.SignozLog) []plog.Logs { + result := []plog.Logs{} + + for _, log := range logs { + pl := plog.NewLogs() + rl := pl.ResourceLogs().AppendEmpty() + + resourceAttribs := rl.Resource().Attributes() + for k, v := range log.Resources_string { + resourceAttribs.PutStr(k, v) + } + + scopeLog := rl.ScopeLogs().AppendEmpty() + slRecord := scopeLog.LogRecords().AppendEmpty() + + slRecord.SetTimestamp(pcommon.NewTimestampFromTime( + time.Unix(0, int64(log.Timestamp)), + )) + + var traceIdBuf [16]byte + copy(traceIdBuf[:], []byte(log.TraceID)) + slRecord.SetTraceID(traceIdBuf) + + var spanIdBuf [8]byte + copy(spanIdBuf[:], []byte(log.SpanID)) + slRecord.SetSpanID(spanIdBuf) + + slRecord.SetFlags(plog.LogRecordFlags(log.TraceFlags)) + + slRecord.SetSeverityText(log.SeverityText) + slRecord.SetSeverityNumber(plog.SeverityNumber(log.SeverityNumber)) + + slRecord.Body().SetStr(log.Body) + + slAttribs := slRecord.Attributes() + for k, v := range log.Attributes_int64 { + slAttribs.PutInt(k, v) + } + for k, v := range log.Attributes_float64 { + slAttribs.PutDouble(k, v) + } + for k, v := range log.Attributes_string { + slAttribs.PutStr(k, v) + } + slAttribs.PutStr(SignozLogIdAttr, log.ID) + + result = append(result, pl) + } + + return result +} + +func PLogsToSignozLogs(plogs []plog.Logs) []model.SignozLog { + result := []model.SignozLog{} + + for _, pl := range plogs { + + resourceLogsSlice := pl.ResourceLogs() + for i := 0; i < resourceLogsSlice.Len(); i++ { + rl := resourceLogsSlice.At(i) + + scopeLogsSlice := rl.ScopeLogs() + for j := 0; j < scopeLogsSlice.Len(); j++ { + sl := scopeLogsSlice.At(j) + + lrSlice := sl.LogRecords() + for k := 0; k < lrSlice.Len(); k++ { + lr := lrSlice.At(k) + + // Recover ID for the log and remove temp attrib used for storing it + signozLogId := "" + logIdVal, exists := lr.Attributes().Get(SignozLogIdAttr) + if exists { + signozLogId = logIdVal.Str() + } + lr.Attributes().Remove(SignozLogIdAttr) + + signozLog := model.SignozLog{ + Timestamp: uint64(lr.Timestamp()), + ID: signozLogId, + TraceID: lr.TraceID().String(), + SpanID: lr.SpanID().String(), + TraceFlags: uint32(lr.Flags()), + SeverityText: lr.SeverityText(), + SeverityNumber: uint8(lr.SeverityNumber()), + Body: lr.Body().AsString(), + Resources_string: pMapToStrMap(rl.Resource().Attributes()), + Attributes_string: map[string]string{}, + Attributes_int64: map[string]int64{}, + Attributes_float64: map[string]float64{}, + } + + // Populate signozLog.Attributes_... + lr.Attributes().Range(func(k string, v pcommon.Value) bool { + if v.Type() == pcommon.ValueTypeDouble { + signozLog.Attributes_float64[k] = v.Double() + } else if v.Type() == pcommon.ValueTypeInt { + signozLog.Attributes_int64[k] = v.Int() + } else { + signozLog.Attributes_string[k] = v.AsString() + } + return true + }) + + result = append(result, signozLog) + } + } + } + } + + return result +} + +func pMapToStrMap(pMap pcommon.Map) map[string]string { + result := map[string]string{} + pMap.Range(func(k string, v pcommon.Value) bool { + result[k] = v.AsString() + return true + }) + return result +} diff --git a/pkg/query-service/app/logparsingpipeline/preview_test.go b/pkg/query-service/app/logparsingpipeline/preview_test.go new file mode 100644 index 0000000000..c453cb1445 --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/preview_test.go @@ -0,0 +1,158 @@ +package logparsingpipeline + +import ( + "context" + "testing" + "time" + + "github.com/google/uuid" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" + "github.com/stretchr/testify/require" + "go.signoz.io/signoz/pkg/query-service/model" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +func TestPipelinePreview(t *testing.T) { + require := require.New(t) + + testPipelines := []Pipeline{ + { + OrderId: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + Filter: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "method", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: "=", + Value: "GET", + }, + }, + }, + Config: []PipelineOperator{ + { + OrderId: 1, + ID: "add", + Type: "add", + Field: "attributes.test", + Value: "val", + Enabled: true, + Name: "test add", + }, + }, + }, + { + OrderId: 2, + Name: "pipeline2", + Alias: "pipeline2", + Enabled: true, + Filter: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "method", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: "=", + Value: "GET", + }, + }, + }, + Config: []PipelineOperator{ + { + OrderId: 1, + ID: "add", + Type: "add", + Field: "resource.test1", + Value: "val1", + Enabled: true, + Name: "test add2", + }, { + OrderId: 2, + ID: "add2", + Type: "add", + Field: "resource.test2", + Value: "val2", + Enabled: true, + Name: "test add3", + }, + }, + }, + } + + matchingLog := makeTestLogEntry( + "test log body", + map[string]string{ + "method": "GET", + }, + ) + nonMatchingLog := makeTestLogEntry( + "test log body", + map[string]string{ + "method": "POST", + }, + ) + + result, err := SimulatePipelinesProcessing( + context.Background(), + testPipelines, + []model.SignozLog{ + matchingLog, + nonMatchingLog, + }, + ) + + require.Nil(err) + require.Equal(2, len(result)) + + // matching log should have been modified as expected. + require.NotEqual( + matchingLog.Attributes_string, + result[0].Attributes_string, + ) + testAttrValue := result[0].Attributes_string["test"] + require.NotNil(testAttrValue) + require.Equal( + testAttrValue, "val", + ) + + require.Equal(result[0].Resources_string, map[string]string{ + "test1": "val1", + "test2": "val2", + }) + + // non-matching log should not be touched. + require.Equal( + nonMatchingLog.Attributes_string, + result[1].Attributes_string, + ) + require.Equal( + nonMatchingLog.Resources_string, + result[1].Resources_string, + ) + +} + +func makeTestLogEntry( + body string, + attributes map[string]string, +) model.SignozLog { + return model.SignozLog{ + Timestamp: uint64(time.Now().UnixNano()), + Body: body, + Attributes_string: attributes, + Resources_string: map[string]string{}, + SeverityText: entry.Info.String(), + SeverityNumber: uint8(entry.Info), + SpanID: uuid.New().String(), + TraceID: uuid.New().String(), + } +} diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 15cc4868b3..b25888f607 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -81,7 +81,7 @@ type Reader interface { // Logs GetLogFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError) UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError - GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.GetLogsResponse, *model.ApiError) + GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.SignozLog, *model.ApiError) TailLogs(ctx context.Context, client *model.LogsTailClient) AggregateLogs(ctx context.Context, params *model.LogsAggregateParams) (*model.GetLogsAggregatesResponse, *model.ApiError) GetLogAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index 3ca7126636..c80f223163 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -538,7 +538,8 @@ type GetFieldsResponse struct { Interesting []LogField `json:"interesting"` } -type GetLogsResponse struct { +// Represents a log record in query service requests and responses. +type SignozLog struct { Timestamp uint64 `json:"timestamp" ch:"timestamp"` ID string `json:"id" ch:"id"` TraceID string `json:"trace_id" ch:"trace_id"` @@ -555,7 +556,7 @@ type GetLogsResponse struct { type LogsTailClient struct { Name string - Logs chan *GetLogsResponse + Logs chan *SignozLog Done chan *bool Error chan error Filter LogsFilterParams diff --git a/pkg/query-service/model/v3/v3.go b/pkg/query-service/model/v3/v3.go index bd9811fdaa..09851a423d 100644 --- a/pkg/query-service/model/v3/v3.go +++ b/pkg/query-service/model/v3/v3.go @@ -625,7 +625,7 @@ type Result struct { type LogsLiveTailClient struct { Name string - Logs chan *model.GetLogsResponse + Logs chan *model.SignozLog Done chan *bool Error chan error }