API for fields added

This commit is contained in:
nityanandagohain 2022-07-12 16:38:26 +05:30
parent 80c80b2180
commit ef141d2cee
9 changed files with 359 additions and 91 deletions

View File

@ -18,16 +18,20 @@ const (
)
const (
defaultDatasource string = "tcp://localhost:9000"
defaultTraceDB string = "signoz_traces"
defaultOperationsTable string = "signoz_operations"
defaultIndexTable string = "signoz_index_v2"
defaultErrorTable string = "signoz_error_index"
defaulDurationTable string = "durationSortMV"
defaultSpansTable string = "signoz_spans"
defaultWriteBatchDelay time.Duration = 5 * time.Second
defaultWriteBatchSize int = 10000
defaultEncoding Encoding = EncodingJSON
defaultDatasource string = "tcp://localhost:9000"
defaultTraceDB string = "signoz_traces"
defaultOperationsTable string = "signoz_operations"
defaultIndexTable string = "signoz_index_v2"
defaultErrorTable string = "signoz_error_index"
defaulDurationTable string = "durationSortMV"
defaultSpansTable string = "signoz_spans"
defaultLogsDB string = "signoz_logs"
defaultLogsTable string = "logs"
defaultLogAttributeKeysTable string = "logs_atrribute_keys"
defaultLogResourceKeysTable string = "logs_resource_keys"
defaultWriteBatchDelay time.Duration = 5 * time.Second
defaultWriteBatchSize int = 10000
defaultEncoding Encoding = EncodingJSON
)
const (
@ -43,19 +47,23 @@ const (
// NamespaceConfig is Clickhouse's internal configuration data
type namespaceConfig struct {
namespace string
Enabled bool
Datasource string
TraceDB string
OperationsTable string
IndexTable string
DurationTable string
SpansTable string
ErrorTable string
WriteBatchDelay time.Duration
WriteBatchSize int
Encoding Encoding
Connector Connector
namespace string
Enabled bool
Datasource string
TraceDB string
OperationsTable string
IndexTable string
DurationTable string
SpansTable string
ErrorTable string
LogsDB string
LogsTable string
LogsAttributeKeysTable string
LogsResourceKeysTable string
WriteBatchDelay time.Duration
WriteBatchSize int
Encoding Encoding
Connector Connector
}
// Connecto defines how to connect to the database
@ -102,19 +110,23 @@ func NewOptions(datasource string, primaryNamespace string, otherNamespaces ...s
options := &Options{
primary: &namespaceConfig{
namespace: primaryNamespace,
Enabled: true,
Datasource: datasource,
TraceDB: defaultTraceDB,
OperationsTable: defaultOperationsTable,
IndexTable: defaultIndexTable,
ErrorTable: defaultErrorTable,
DurationTable: defaulDurationTable,
SpansTable: defaultSpansTable,
WriteBatchDelay: defaultWriteBatchDelay,
WriteBatchSize: defaultWriteBatchSize,
Encoding: defaultEncoding,
Connector: defaultConnector,
namespace: primaryNamespace,
Enabled: true,
Datasource: datasource,
TraceDB: defaultTraceDB,
OperationsTable: defaultOperationsTable,
IndexTable: defaultIndexTable,
ErrorTable: defaultErrorTable,
DurationTable: defaulDurationTable,
SpansTable: defaultSpansTable,
LogsDB: defaultLogsDB,
LogsTable: defaultLogsTable,
LogsAttributeKeysTable: defaultLogAttributeKeysTable,
LogsResourceKeysTable: defaultLogResourceKeysTable,
WriteBatchDelay: defaultWriteBatchDelay,
WriteBatchSize: defaultWriteBatchSize,
Encoding: defaultEncoding,
Connector: defaultConnector,
},
others: make(map[string]*namespaceConfig, len(otherNamespaces)),
}
@ -122,16 +134,20 @@ func NewOptions(datasource string, primaryNamespace string, otherNamespaces ...s
for _, namespace := range otherNamespaces {
if namespace == archiveNamespace {
options.others[namespace] = &namespaceConfig{
namespace: namespace,
Datasource: datasource,
TraceDB: "",
OperationsTable: "",
IndexTable: "",
ErrorTable: "",
WriteBatchDelay: defaultWriteBatchDelay,
WriteBatchSize: defaultWriteBatchSize,
Encoding: defaultEncoding,
Connector: defaultConnector,
namespace: namespace,
Datasource: datasource,
TraceDB: "",
OperationsTable: "",
IndexTable: "",
ErrorTable: "",
LogsDB: "",
LogsTable: "",
LogsAttributeKeysTable: "",
LogsResourceKeysTable: "",
WriteBatchDelay: defaultWriteBatchDelay,
WriteBatchSize: defaultWriteBatchSize,
Encoding: defaultEncoding,
Connector: defaultConnector,
}
} else {
options.others[namespace] = &namespaceConfig{namespace: namespace}

View File

@ -83,19 +83,23 @@ var (
// SpanWriter for reading spans from ClickHouse
type ClickHouseReader struct {
db clickhouse.Conn
localDB *sqlx.DB
traceDB string
operationsTable string
durationTable string
indexTable string
errorTable string
spansTable string
queryEngine *promql.Engine
remoteStorage *remote.Storage
ruleManager *rules.Manager
promConfig *config.Config
alertManager am.Manager
db clickhouse.Conn
localDB *sqlx.DB
traceDB string
operationsTable string
durationTable string
indexTable string
errorTable string
spansTable string
logsDB string
logsTable string
logsAttributeKeys string
logsResourceKeys string
queryEngine *promql.Engine
remoteStorage *remote.Storage
ruleManager *rules.Manager
promConfig *config.Config
alertManager am.Manager
}
// NewTraceReader returns a TraceReader for the database
@ -113,15 +117,19 @@ func NewReader(localDB *sqlx.DB) *ClickHouseReader {
alertManager := am.New("")
return &ClickHouseReader{
db: db,
localDB: localDB,
traceDB: options.primary.TraceDB,
alertManager: alertManager,
operationsTable: options.primary.OperationsTable,
indexTable: options.primary.IndexTable,
errorTable: options.primary.ErrorTable,
durationTable: options.primary.DurationTable,
spansTable: options.primary.SpansTable,
db: db,
localDB: localDB,
traceDB: options.primary.TraceDB,
alertManager: alertManager,
operationsTable: options.primary.OperationsTable,
indexTable: options.primary.IndexTable,
errorTable: options.primary.ErrorTable,
durationTable: options.primary.DurationTable,
spansTable: options.primary.SpansTable,
logsDB: options.primary.LogsDB,
logsTable: options.primary.LogsTable,
logsAttributeKeys: options.primary.LogsAttributeKeysTable,
logsResourceKeys: options.primary.LogsResourceKeysTable,
}
}
@ -2985,3 +2993,89 @@ func (r *ClickHouseReader) GetSamplesInfoInLastHeartBeatInterval(ctx context.Con
return totalSamples, nil
}
func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError) {
// response will contain top level fields from the otel log model
response := model.GetFieldsResponse{
Selected: constants.StaticSelectedLogFields,
Interesting: []model.LogField{},
}
// get attribute keys
attributes := &[]model.LogField{}
query := fmt.Sprintf("SELECT DISTINCT name, datatype from %s.%s group by name, datatype", r.logsDB, r.logsAttributeKeys)
err := r.db.Select(ctx, attributes, query)
if err != nil {
return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
// get resource keys
resources := &[]model.LogField{}
query = fmt.Sprintf("SELECT DISTINCT name, datatype from %s.%s group by name, datatype", r.logsDB, r.logsResourceKeys)
err = r.db.Select(ctx, resources, query)
if err != nil {
return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
statements := []model.CreateTableStatement{}
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsTable)
err = r.db.Select(ctx, &statements, query)
if err != nil {
return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
extractSelectedAndInterestingFields(statements[0].Statement, constants.Attributes, attributes, &response)
extractSelectedAndInterestingFields(statements[0].Statement, constants.Resources, resources, &response)
extractSelectedAndInterestingFields(statements[0].Statement, constants.Static, &constants.StaticInterestingLogFields, &response)
return &response, nil
}
func extractSelectedAndInterestingFields(tableStatement string, fieldType string, fields *[]model.LogField, response *model.GetFieldsResponse) {
for _, field := range *fields {
field.Type = fieldType
if strings.Contains(tableStatement, fmt.Sprintf("INDEX %s_idx", field.Name)) {
response.Selected = append(response.Selected, field)
} else {
response.Interesting = append(response.Interesting, field)
}
}
}
func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError {
// if a field is selected it means that the field is indexed
if field.Selected {
// if the type is attribute or resource, create the materialized column first
if field.Type == constants.Attributes || field.Type == constants.Resources {
// create materialized
query := fmt.Sprintf("ALTER TABLE %s.%s ADD COLUMN IF NOT EXISTS %s %s MATERIALIZED %s_%s_value[indexOf(%s_%s_key, '%s')]", r.logsDB, r.logsTable, field.Name, field.DataType, field.Type, strings.ToLower(field.DataType), field.Type, strings.ToLower(field.DataType), field.Name)
err := r.db.Exec(ctx, query)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
}
// create the index
if field.IndexType == nil {
iType := constants.DefaultLogSkipIndexType
field.IndexType = &iType
}
if field.IndexGranularity == nil {
granularity := constants.DefaultLogSkipIndexGranularity
field.IndexGranularity = &granularity
}
query := fmt.Sprintf("ALTER TABLE %s.%s ADD INDEX IF NOT EXISTS %s_idx (%s) TYPE %s GRANULARITY %d", r.logsDB, r.logsTable, field.Name, field.Name, *field.IndexType, *field.IndexGranularity)
err := r.db.Exec(ctx, query)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
} else {
// remove index
query := fmt.Sprintf("ALTER TABLE %s.%s DROP INDEX IF EXISTS %s_idx", r.logsDB, r.logsTable, field.Name)
err := r.db.Exec(ctx, query)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
}
return nil
}

View File

@ -16,6 +16,7 @@ import (
_ "github.com/mattn/go-sqlite3"
"github.com/prometheus/prometheus/promql"
"go.signoz.io/query-service/app/dashboards"
"go.signoz.io/query-service/app/logs"
"go.signoz.io/query-service/app/metrics"
"go.signoz.io/query-service/app/parser"
"go.signoz.io/query-service/auth"
@ -1816,3 +1817,43 @@ func (aH *APIHandler) writeJSON(w http.ResponseWriter, r *http.Request, response
w.Header().Set("Content-Type", "application/json")
w.Write(resp)
}
// logs
func (aH *APIHandler) RegisterLogsRoutes(router *mux.Router) {
subRouter := router.PathPrefix("/api/v1/logs").Subrouter()
subRouter.HandleFunc("/fields", ViewAccess(aH.logFields)).Methods(http.MethodGet)
subRouter.HandleFunc("/fields", ViewAccess(aH.logFieldUpdate)).Methods(http.MethodPost)
}
func (aH *APIHandler) logFields(w http.ResponseWriter, r *http.Request) {
fields, apiErr := (*aH.reader).GetLogFields(r.Context())
if apiErr != nil {
respondError(w, apiErr, "Failed to fetch org from the DB")
return
}
aH.writeJSON(w, r, fields)
}
func (aH *APIHandler) logFieldUpdate(w http.ResponseWriter, r *http.Request) {
field := model.UpdateField{}
if err := json.NewDecoder(r.Body).Decode(&field); err != nil {
apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err}
respondError(w, apiErr, "Failed to decode payload")
return
}
err := logs.ValidateUpdateFieldPayload(&field)
if err != nil {
apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err}
respondError(w, apiErr, "Incorrect payload")
return
}
apiErr := (*aH.reader).UpdateLogField(r.Context(), &field)
if apiErr != nil {
respondError(w, apiErr, "Failed to fetch org from the DB")
return
}
aH.writeJSON(w, r, field)
}

View File

@ -0,0 +1,40 @@
package logs
import (
"fmt"
"regexp"
"go.signoz.io/query-service/constants"
"go.signoz.io/query-service/model"
)
func ValidateUpdateFieldPayload(field *model.UpdateField) error {
if field.Name == "" {
return fmt.Errorf("name cannot be empty")
}
if field.Type == "" {
return fmt.Errorf("type cannot be empty")
}
if field.DataType == "" {
return fmt.Errorf("dataType cannot be empty")
}
matched, err := regexp.MatchString(fmt.Sprintf("^(%s|%s|%s)$", constants.Static, constants.Attributes, constants.Resources), field.Type)
if err != nil {
return err
}
if !matched {
return fmt.Errorf("type %s not supported", field.Type)
}
if field.IndexType != nil {
matched, err := regexp.MatchString(`^(minmax|set\([0-9]\)|bloom_filter\((0?.?[0-9]+|1)\)|tokenbf_v1\([0-9]+,[0-9]+,[0-9]+\)|ngrambf_v1\([0-9]+,[0-9]+,[0-9]+,[0-9]+\))$`, *field.IndexType)
if err != nil {
return err
}
if !matched {
return fmt.Errorf("index type %s not supported", *field.IndexType)
}
}
return nil
}

View File

@ -145,6 +145,7 @@ func (s *Server) createPublicServer(api *APIHandler) (*http.Server, error) {
api.RegisterRoutes(r)
api.RegisterMetricsRoutes(r)
api.RegisterLogsRoutes(r)
c := cors.New(cors.Options{
AllowedOrigins: []string{"*"},

View File

@ -3,6 +3,8 @@ package constants
import (
"os"
"strconv"
"go.signoz.io/query-service/model"
)
const (
@ -38,29 +40,34 @@ var AmChannelApiPath = GetOrDefaultEnv("ALERTMANAGER_API_CHANNEL_PATH", "v1/rout
var RELATIONAL_DATASOURCE_PATH = GetOrDefaultEnv("SIGNOZ_LOCAL_DB_PATH", "/var/lib/signoz/signoz.db")
const (
ServiceName = "serviceName"
HttpRoute = "httpRoute"
HttpCode = "httpCode"
HttpHost = "httpHost"
HttpUrl = "httpUrl"
HttpMethod = "httpMethod"
Component = "component"
OperationDB = "name"
OperationRequest = "operation"
Status = "status"
Duration = "duration"
DBName = "dbName"
DBOperation = "dbOperation"
DBSystem = "dbSystem"
MsgSystem = "msgSystem"
MsgOperation = "msgOperation"
Timestamp = "timestamp"
Descending = "descending"
Ascending = "ascending"
ContextTimeout = 60 // seconds
StatusPending = "pending"
StatusFailed = "failed"
StatusSuccess = "success"
ServiceName = "serviceName"
HttpRoute = "httpRoute"
HttpCode = "httpCode"
HttpHost = "httpHost"
HttpUrl = "httpUrl"
HttpMethod = "httpMethod"
Component = "component"
OperationDB = "name"
OperationRequest = "operation"
Status = "status"
Duration = "duration"
DBName = "dbName"
DBOperation = "dbOperation"
DBSystem = "dbSystem"
MsgSystem = "msgSystem"
MsgOperation = "msgOperation"
Timestamp = "timestamp"
Descending = "descending"
Ascending = "ascending"
ContextTimeout = 60 // seconds
StatusPending = "pending"
StatusFailed = "failed"
StatusSuccess = "success"
Attributes = "attributes"
Resources = "resources"
Static = "static"
DefaultLogSkipIndexType = "bloom_filter(0.01)"
DefaultLogSkipIndexGranularity = 64
)
const (
SIGNOZ_METRIC_DBNAME = "signoz_metrics"
@ -75,3 +82,44 @@ func GetOrDefaultEnv(key string, fallback string) string {
}
return v
}
var StaticInterestingLogFields = []model.LogField{
{
Name: "trace_id",
DataType: "String",
Type: Static,
},
{
Name: "span_id",
DataType: "String",
Type: Static,
},
{
Name: "trace_flags",
DataType: "UInt32",
Type: Static,
},
{
Name: "severity_text",
DataType: "LowCardinality(String)",
Type: Static,
},
{
Name: "severity_number",
DataType: "Int32",
Type: Static,
},
}
var StaticSelectedLogFields = []model.LogField{
{
Name: "timestamp",
DataType: "UInt64",
Type: Static,
},
{
Name: "id",
DataType: "String",
Type: Static,
},
}

View File

@ -59,4 +59,8 @@ type Reader interface {
GetSpansInLastHeartBeatInterval(ctx context.Context) (uint64, error)
GetTimeSeriesInfo(ctx context.Context) (map[string]interface{}, error)
GetSamplesInfoInLastHeartBeatInterval(ctx context.Context) (uint64, error)
// Logs
GetLogFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError)
UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError
}

View File

@ -303,3 +303,12 @@ type FilterSet struct {
Operator string `json:"op,omitempty"`
Items []FilterItem `json:"items"`
}
type UpdateField struct {
Name string `json:"name"`
DataType string `json:"dataType"`
Type string `json:"type"`
Selected bool `json:"selected"`
IndexType *string `json:"index"`
IndexGranularity *int `json:"indexGranularity"`
}

View File

@ -373,3 +373,18 @@ func (p *MetricPoint) MarshalJSON() ([]byte, error) {
v := strconv.FormatFloat(p.Value, 'f', -1, 64)
return json.Marshal([...]interface{}{float64(p.Timestamp) / 1000, v})
}
type CreateTableStatement struct {
Statement string `json:"statement" ch:"statement"`
}
type LogField struct {
Name string `json:"name" ch:"name"`
DataType string `json:"dataType" ch:"datatype"`
Type string `json:"type"`
}
type GetFieldsResponse struct {
Selected []LogField `json:"selected"`
Interesting []LogField `json:"interesting"`
}