Merge pull request #1379 from nityanandagohain/feat/logs

Support for Logs
This commit is contained in:
Ankit Nayan 2022-08-10 15:29:03 +05:30 committed by GitHub
commit a1f6f09ae1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1414 additions and 61 deletions

View File

@ -28,6 +28,11 @@ const (
defaultSpansTable string = "signoz_spans" defaultSpansTable string = "signoz_spans"
defaultDependencyGraphTable string = "dependency_graph_minutes" defaultDependencyGraphTable string = "dependency_graph_minutes"
defaultTopLevelOperationsTable string = "top_level_operations" defaultTopLevelOperationsTable string = "top_level_operations"
defaultLogsDB string = "signoz_logs"
defaultLogsTable string = "logs"
defaultLogAttributeKeysTable string = "logs_atrribute_keys"
defaultLogResourceKeysTable string = "logs_resource_keys"
defaultLiveTailRefreshSeconds int = 10
defaultWriteBatchDelay time.Duration = 5 * time.Second defaultWriteBatchDelay time.Duration = 5 * time.Second
defaultWriteBatchSize int = 10000 defaultWriteBatchSize int = 10000
defaultEncoding Encoding = EncodingJSON defaultEncoding Encoding = EncodingJSON
@ -58,6 +63,11 @@ type namespaceConfig struct {
ErrorTable string ErrorTable string
DependencyGraphTable string DependencyGraphTable string
TopLevelOperationsTable string TopLevelOperationsTable string
LogsDB string
LogsTable string
LogsAttributeKeysTable string
LogsResourceKeysTable string
LiveTailRefreshSeconds int
WriteBatchDelay time.Duration WriteBatchDelay time.Duration
WriteBatchSize int WriteBatchSize int
Encoding Encoding Encoding Encoding
@ -120,6 +130,11 @@ func NewOptions(datasource string, primaryNamespace string, otherNamespaces ...s
SpansTable: defaultSpansTable, SpansTable: defaultSpansTable,
DependencyGraphTable: defaultDependencyGraphTable, DependencyGraphTable: defaultDependencyGraphTable,
TopLevelOperationsTable: defaultTopLevelOperationsTable, TopLevelOperationsTable: defaultTopLevelOperationsTable,
LogsDB: defaultLogsDB,
LogsTable: defaultLogsTable,
LogsAttributeKeysTable: defaultLogAttributeKeysTable,
LogsResourceKeysTable: defaultLogResourceKeysTable,
LiveTailRefreshSeconds: defaultLiveTailRefreshSeconds,
WriteBatchDelay: defaultWriteBatchDelay, WriteBatchDelay: defaultWriteBatchDelay,
WriteBatchSize: defaultWriteBatchSize, WriteBatchSize: defaultWriteBatchSize,
Encoding: defaultEncoding, Encoding: defaultEncoding,
@ -131,16 +146,21 @@ func NewOptions(datasource string, primaryNamespace string, otherNamespaces ...s
for _, namespace := range otherNamespaces { for _, namespace := range otherNamespaces {
if namespace == archiveNamespace { if namespace == archiveNamespace {
options.others[namespace] = &namespaceConfig{ options.others[namespace] = &namespaceConfig{
namespace: namespace, namespace: namespace,
Datasource: datasource, Datasource: datasource,
TraceDB: "", TraceDB: "",
OperationsTable: "", OperationsTable: "",
IndexTable: "", IndexTable: "",
ErrorTable: "", ErrorTable: "",
WriteBatchDelay: defaultWriteBatchDelay, LogsDB: "",
WriteBatchSize: defaultWriteBatchSize, LogsTable: "",
Encoding: defaultEncoding, LogsAttributeKeysTable: "",
Connector: defaultConnector, LogsResourceKeysTable: "",
LiveTailRefreshSeconds: defaultLiveTailRefreshSeconds,
WriteBatchDelay: defaultWriteBatchDelay,
WriteBatchSize: defaultWriteBatchSize,
Encoding: defaultEncoding,
Connector: defaultConnector,
} }
} else { } else {
options.others[namespace] = &namespaceConfig{namespace: namespace} options.others[namespace] = &namespaceConfig{namespace: namespace}

View File

@ -39,6 +39,7 @@ import (
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
promModel "github.com/prometheus/common/model" promModel "github.com/prometheus/common/model"
"go.signoz.io/query-service/app/logs"
"go.signoz.io/query-service/constants" "go.signoz.io/query-service/constants"
am "go.signoz.io/query-service/integrations/alertManager" am "go.signoz.io/query-service/integrations/alertManager"
"go.signoz.io/query-service/model" "go.signoz.io/query-service/model"
@ -81,18 +82,24 @@ type ClickHouseReader struct {
traceDB string traceDB string
operationsTable string operationsTable string
durationTable string durationTable string
usageExplorerTable string
indexTable string indexTable string
errorTable string errorTable string
usageExplorerTable string
spansTable string spansTable string
dependencyGraphTable string dependencyGraphTable string
topLevelOperationsTable string topLevelOperationsTable string
logsDB string
logsTable string
logsAttributeKeys string
logsResourceKeys string
queryEngine *promql.Engine queryEngine *promql.Engine
remoteStorage *remote.Storage remoteStorage *remote.Storage
promConfigFile string promConfigFile string
promConfig *config.Config promConfig *config.Config
alertManager am.Manager alertManager am.Manager
liveTailRefreshSeconds int
} }
// NewTraceReader returns a TraceReader for the database // NewTraceReader returns a TraceReader for the database
@ -127,6 +134,11 @@ func NewReader(localDB *sqlx.DB, configFile string) *ClickHouseReader {
spansTable: options.primary.SpansTable, spansTable: options.primary.SpansTable,
dependencyGraphTable: options.primary.DependencyGraphTable, dependencyGraphTable: options.primary.DependencyGraphTable,
topLevelOperationsTable: options.primary.TopLevelOperationsTable, topLevelOperationsTable: options.primary.TopLevelOperationsTable,
logsDB: options.primary.LogsDB,
logsTable: options.primary.LogsTable,
logsAttributeKeys: options.primary.LogsAttributeKeysTable,
logsResourceKeys: options.primary.LogsResourceKeysTable,
liveTailRefreshSeconds: options.primary.LiveTailRefreshSeconds,
promConfigFile: configFile, promConfigFile: configFile,
} }
} }
@ -1972,7 +1984,7 @@ func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, query
return &GetFilteredSpansAggregatesResponse, nil return &GetFilteredSpansAggregatesResponse, nil
} }
// SetTTL sets the TTL for traces or metrics tables. // SetTTL sets the TTL for traces or metrics or logs tables.
// This is an async API which creates goroutines to set TTL. // This is an async API which creates goroutines to set TTL.
// Status of TTL update is tracked with ttl_status table in sqlite db. // Status of TTL update is tracked with ttl_status table in sqlite db.
func (r *ClickHouseReader) SetTTL(ctx context.Context, func (r *ClickHouseReader) SetTTL(ctx context.Context,
@ -2101,6 +2113,59 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
return return
} }
}(tableName) }(tableName)
case constants.LogsTTL:
tableName = r.logsDB + "." + r.logsTable
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
}
if statusItem.Status == constants.StatusPending {
return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
}
go func(tableName string) {
_, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration)
if dbErr != nil {
zap.S().Error(fmt.Errorf("error in inserting to ttl_status table: %s", dbErr.Error()))
return
}
req = fmt.Sprintf(
"ALTER TABLE %v MODIFY TTL toDateTime(timestamp / 1000000000) + "+
"INTERVAL %v SECOND DELETE", tableName, params.DelDuration)
if len(params.ColdStorageVolume) > 0 {
req += fmt.Sprintf(", toDateTime(timestamp / 1000000000)"+
" + INTERVAL %v SECOND TO VOLUME '%s'",
params.ToColdStorageDuration, params.ColdStorageVolume)
}
err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
if err != nil {
zap.S().Error(fmt.Errorf("error in setting cold storage: %s", err.Err.Error()))
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
if err == nil {
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id)
if dbErr != nil {
zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr)
return
}
}
return
}
zap.S().Debugf("Executing TTL request: %s\n", req)
statusItem, _ := r.checkTTLStatusItem(ctx, tableName)
if err := r.db.Exec(ctx, req); err != nil {
zap.S().Error(fmt.Errorf("error while setting ttl. Err=%v", err))
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id)
if dbErr != nil {
zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr)
return
}
return
}
_, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id)
if dbErr != nil {
zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr)
return
}
}(tableName)
default: default:
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while setting ttl. ttl type should be <metrics|traces>, got %v", return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while setting ttl. ttl type should be <metrics|traces>, got %v",
@ -2264,6 +2329,24 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
} }
} }
getLogsTTL := func() (*model.DBResponseTTL, *model.ApiError) {
var dbResp []model.DBResponseTTL
query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", r.logsTable, r.logsDB)
err := r.db.Select(ctx, &dbResp, query)
if err != nil {
zap.S().Error(fmt.Errorf("error while getting ttl. Err=%v", err))
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while getting ttl. Err=%v", err)}
}
if len(dbResp) == 0 {
return nil, nil
} else {
return &dbResp[0], nil
}
}
switch ttlParams.Type { switch ttlParams.Type {
case constants.TraceTTL: case constants.TraceTTL:
tableNameArray := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable, signozTraceDBName + "." + signozUsageExplorerTable, signozTraceDBName + "." + defaultDependencyGraphTable} tableNameArray := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable, signozTraceDBName + "." + signozUsageExplorerTable, signozTraceDBName + "." + defaultDependencyGraphTable}
@ -2308,6 +2391,29 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
delTTL, moveTTL := parseTTL(dbResp.EngineFull) delTTL, moveTTL := parseTTL(dbResp.EngineFull)
return &model.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL, ExpectedMetricsTime: ttlQuery.TTL, ExpectedMetricsMoveTime: ttlQuery.ColdStorageTtl, Status: status}, nil return &model.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL, ExpectedMetricsTime: ttlQuery.TTL, ExpectedMetricsMoveTime: ttlQuery.ColdStorageTtl, Status: status}, nil
case constants.LogsTTL:
tableNameArray := []string{r.logsDB + "." + r.logsTable}
status, err := r.setTTLQueryStatus(ctx, tableNameArray)
if err != nil {
return nil, err
}
dbResp, err := getLogsTTL()
if err != nil {
return nil, err
}
ttlQuery, err := r.checkTTLStatusItem(ctx, tableNameArray[0])
if err != nil {
return nil, err
}
ttlQuery.TTL = ttlQuery.TTL / 3600 // convert to hours
if ttlQuery.ColdStorageTtl != -1 {
ttlQuery.ColdStorageTtl = ttlQuery.ColdStorageTtl / 3600 // convert to hours
}
delTTL, moveTTL := parseTTL(dbResp.EngineFull)
return &model.GetTTLResponseItem{LogsTime: delTTL, LogsMoveTime: moveTTL, ExpectedLogsTime: ttlQuery.TTL, ExpectedLogsMoveTime: ttlQuery.ColdStorageTtl, Status: status}, nil
default: default:
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while getting ttl. ttl type should be metrics|traces, got %v", return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while getting ttl. ttl type should be metrics|traces, got %v",
ttlParams.Type)} ttlParams.Type)}
@ -2849,3 +2955,272 @@ func (r *ClickHouseReader) GetSamplesInfoInLastHeartBeatInterval(ctx context.Con
return totalSamples, nil return totalSamples, nil
} }
func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError) {
// response will contain top level fields from the otel log model
response := model.GetFieldsResponse{
Selected: constants.StaticSelectedLogFields,
Interesting: []model.LogField{},
}
// get attribute keys
attributes := []model.LogField{}
query := fmt.Sprintf("SELECT DISTINCT name, datatype from %s.%s group by name, datatype", r.logsDB, r.logsAttributeKeys)
err := r.db.Select(ctx, &attributes, query)
if err != nil {
return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
// get resource keys
resources := []model.LogField{}
query = fmt.Sprintf("SELECT DISTINCT name, datatype from %s.%s group by name, datatype", r.logsDB, r.logsResourceKeys)
err = r.db.Select(ctx, &resources, query)
if err != nil {
return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
statements := []model.ShowCreateTableStatement{}
query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.logsDB, r.logsTable)
err = r.db.Select(ctx, &statements, query)
if err != nil {
return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
extractSelectedAndInterestingFields(statements[0].Statement, constants.Attributes, &attributes, &response)
extractSelectedAndInterestingFields(statements[0].Statement, constants.Resources, &resources, &response)
extractSelectedAndInterestingFields(statements[0].Statement, constants.Static, &constants.StaticInterestingLogFields, &response)
return &response, nil
}
func extractSelectedAndInterestingFields(tableStatement string, fieldType string, fields *[]model.LogField, response *model.GetFieldsResponse) {
for _, field := range *fields {
field.Type = fieldType
if strings.Contains(tableStatement, fmt.Sprintf("INDEX %s_idx", field.Name)) {
response.Selected = append(response.Selected, field)
} else {
response.Interesting = append(response.Interesting, field)
}
}
}
func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError {
// if a field is selected it means that the field needs to be indexed
if field.Selected {
// if the type is attribute or resource, create the materialized column first
if field.Type == constants.Attributes || field.Type == constants.Resources {
// create materialized
query := fmt.Sprintf("ALTER TABLE %s.%s ADD COLUMN IF NOT EXISTS %s %s MATERIALIZED %s_%s_value[indexOf(%s_%s_key, '%s')]", r.logsDB, r.logsTable, field.Name, field.DataType, field.Type, strings.ToLower(field.DataType), field.Type, strings.ToLower(field.DataType), field.Name)
err := r.db.Exec(ctx, query)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
}
// create the index
if field.IndexType == "" {
field.IndexType = constants.DefaultLogSkipIndexType
}
if field.IndexGranularity == 0 {
field.IndexGranularity = constants.DefaultLogSkipIndexGranularity
}
query := fmt.Sprintf("ALTER TABLE %s.%s ADD INDEX IF NOT EXISTS %s_idx (%s) TYPE %s GRANULARITY %d", r.logsDB, r.logsTable, field.Name, field.Name, field.IndexType, field.IndexGranularity)
err := r.db.Exec(ctx, query)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
} else {
// remove index
query := fmt.Sprintf("ALTER TABLE %s.%s DROP INDEX IF EXISTS %s_idx", r.logsDB, r.logsTable, field.Name)
err := r.db.Exec(ctx, query)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
}
return nil
}
func (r *ClickHouseReader) GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.GetLogsResponse, *model.ApiError) {
response := []model.GetLogsResponse{}
fields, apiErr := r.GetLogFields(ctx)
if apiErr != nil {
return nil, apiErr
}
isPaginatePrev := logs.CheckIfPrevousPaginateAndModifyOrder(params)
filterSql, err := logs.GenerateSQLWhere(fields, params)
if err != nil {
return nil, &model.ApiError{Err: err, Typ: model.ErrorBadData}
}
query := fmt.Sprintf("%s from %s.%s", constants.LogsSQLSelect, r.logsDB, r.logsTable)
if filterSql != "" {
query = fmt.Sprintf("%s where %s", query, filterSql)
}
query = fmt.Sprintf("%s order by %s %s limit %d", query, params.OrderBy, params.Order, params.Limit)
zap.S().Debug(query)
err = r.db.Select(ctx, &response, query)
if err != nil {
return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
if isPaginatePrev {
// rever the results from db
for i, j := 0, len(response)-1; i < j; i, j = i+1, j-1 {
response[i], response[j] = response[j], response[i]
}
}
return &response, nil
}
func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailClient) {
fields, apiErr := r.GetLogFields(ctx)
if apiErr != nil {
client.Error <- apiErr.Err
return
}
filterSql, err := logs.GenerateSQLWhere(fields, &model.LogsFilterParams{
Query: client.Filter.Query,
})
if err != nil {
client.Error <- err
return
}
query := fmt.Sprintf("%s from %s.%s", constants.LogsSQLSelect, r.logsDB, r.logsTable)
tsStart := uint64(time.Now().UnixNano())
if client.Filter.TimestampStart != 0 {
tsStart = client.Filter.TimestampStart
}
var idStart string
if client.Filter.IdGt != "" {
idStart = client.Filter.IdGt
}
ticker := time.NewTicker(time.Duration(r.liveTailRefreshSeconds) * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
done := true
client.Done <- &done
zap.S().Debug("closing go routine : " + client.Name)
return
case <-ticker.C:
// get the new 100 logs as anything more older won't make sense
tmpQuery := fmt.Sprintf("%s where timestamp >='%d'", query, tsStart)
if filterSql != "" {
tmpQuery = fmt.Sprintf("%s and %s", tmpQuery, filterSql)
}
if idStart != "" {
tmpQuery = fmt.Sprintf("%s and id > '%s'", tmpQuery, idStart)
}
tmpQuery = fmt.Sprintf("%s order by timestamp desc, id desc limit 100", tmpQuery)
zap.S().Debug(tmpQuery)
response := []model.GetLogsResponse{}
err := r.db.Select(ctx, &response, tmpQuery)
if err != nil {
zap.S().Error(err)
client.Error <- err
return
}
for i := len(response) - 1; i >= 0; i-- {
select {
case <-ctx.Done():
done := true
client.Done <- &done
zap.S().Debug("closing go routine while sending logs : " + client.Name)
return
default:
client.Logs <- &response[i]
if i == 0 {
tsStart = response[i].Timestamp
idStart = response[i].ID
}
}
}
}
}
}
func (r *ClickHouseReader) AggregateLogs(ctx context.Context, params *model.LogsAggregateParams) (*model.GetLogsAggregatesResponse, *model.ApiError) {
logAggregatesDBResponseItems := []model.LogsAggregatesDBResponseItem{}
function := "toFloat64(count()) as value"
if params.Function != "" {
function = fmt.Sprintf("toFloat64(%s) as value", params.Function)
}
fields, apiErr := r.GetLogFields(ctx)
if apiErr != nil {
return nil, apiErr
}
filterSql, err := logs.GenerateSQLWhere(fields, &model.LogsFilterParams{
Query: params.Query,
})
if err != nil {
return nil, &model.ApiError{Err: err, Typ: model.ErrorBadData}
}
query := ""
if params.GroupBy != "" {
query = fmt.Sprintf("SELECT toInt64(toUnixTimestamp(toStartOfInterval(toDateTime(timestamp/1000000000), INTERVAL %d minute))*1000000000) as time, toString(%s) as groupBy, "+
"%s "+
"FROM %s.%s WHERE timestamp >= '%d' AND timestamp <= '%d' ",
params.StepSeconds/60, params.GroupBy, function, r.logsDB, r.logsTable, params.TimestampStart, params.TimestampEnd)
} else {
query = fmt.Sprintf("SELECT toInt64(toUnixTimestamp(toStartOfInterval(toDateTime(timestamp/1000000000), INTERVAL %d minute))*1000000000) as time, "+
"%s "+
"FROM %s.%s WHERE timestamp >= '%d' AND timestamp <= '%d' ",
params.StepSeconds/60, function, r.logsDB, r.logsTable, params.TimestampStart, params.TimestampEnd)
}
if filterSql != "" {
query = fmt.Sprintf("%s AND %s ", query, filterSql)
}
if params.GroupBy != "" {
query = fmt.Sprintf("%s GROUP BY time, toString(%s) as groupBy ORDER BY time", query, params.GroupBy)
} else {
query = fmt.Sprintf("%s GROUP BY time ORDER BY time", query)
}
zap.S().Debug(query)
err = r.db.Select(ctx, &logAggregatesDBResponseItems, query)
if err != nil {
return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal}
}
aggregateResponse := model.GetLogsAggregatesResponse{
Items: make(map[int64]model.LogsAggregatesResponseItem),
}
for i := range logAggregatesDBResponseItems {
if elem, ok := aggregateResponse.Items[int64(logAggregatesDBResponseItems[i].Timestamp)]; ok {
if params.GroupBy != "" && logAggregatesDBResponseItems[i].GroupBy != "" {
elem.GroupBy[logAggregatesDBResponseItems[i].GroupBy] = logAggregatesDBResponseItems[i].Value
}
aggregateResponse.Items[logAggregatesDBResponseItems[i].Timestamp] = elem
} else {
if params.GroupBy != "" && logAggregatesDBResponseItems[i].GroupBy != "" {
aggregateResponse.Items[logAggregatesDBResponseItems[i].Timestamp] = model.LogsAggregatesResponseItem{
Timestamp: logAggregatesDBResponseItems[i].Timestamp,
GroupBy: map[string]interface{}{logAggregatesDBResponseItems[i].GroupBy: logAggregatesDBResponseItems[i].Value},
}
} else if params.GroupBy == "" {
aggregateResponse.Items[logAggregatesDBResponseItems[i].Timestamp] = model.LogsAggregatesResponseItem{
Timestamp: logAggregatesDBResponseItems[i].Timestamp,
Value: logAggregatesDBResponseItems[i].Value,
}
}
}
}
return &aggregateResponse, nil
}

View File

@ -1,6 +1,7 @@
package app package app
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
@ -16,6 +17,7 @@ import (
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
"github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql"
"go.signoz.io/query-service/app/dashboards" "go.signoz.io/query-service/app/dashboards"
"go.signoz.io/query-service/app/logs"
"go.signoz.io/query-service/app/metrics" "go.signoz.io/query-service/app/metrics"
"go.signoz.io/query-service/app/parser" "go.signoz.io/query-service/app/parser"
"go.signoz.io/query-service/auth" "go.signoz.io/query-service/auth"
@ -1922,3 +1924,120 @@ func (aH *APIHandler) writeJSON(w http.ResponseWriter, r *http.Request, response
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.Write(resp) w.Write(resp)
} }
// logs
func (aH *APIHandler) RegisterLogsRoutes(router *mux.Router) {
subRouter := router.PathPrefix("/api/v1/logs").Subrouter()
subRouter.HandleFunc("", ViewAccess(aH.getLogs)).Methods(http.MethodGet)
subRouter.HandleFunc("/tail", ViewAccess(aH.tailLogs)).Methods(http.MethodGet)
subRouter.HandleFunc("/fields", ViewAccess(aH.logFields)).Methods(http.MethodGet)
subRouter.HandleFunc("/fields", EditAccess(aH.logFieldUpdate)).Methods(http.MethodPost)
subRouter.HandleFunc("/aggregate", ViewAccess(aH.logAggregate)).Methods(http.MethodGet)
}
func (aH *APIHandler) logFields(w http.ResponseWriter, r *http.Request) {
fields, apiErr := (*aH.reader).GetLogFields(r.Context())
if apiErr != nil {
respondError(w, apiErr, "Failed to fetch fields from the DB")
return
}
aH.writeJSON(w, r, fields)
}
func (aH *APIHandler) logFieldUpdate(w http.ResponseWriter, r *http.Request) {
field := model.UpdateField{}
if err := json.NewDecoder(r.Body).Decode(&field); err != nil {
apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err}
respondError(w, apiErr, "Failed to decode payload")
return
}
err := logs.ValidateUpdateFieldPayload(&field)
if err != nil {
apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err}
respondError(w, apiErr, "Incorrect payload")
return
}
apiErr := (*aH.reader).UpdateLogField(r.Context(), &field)
if apiErr != nil {
respondError(w, apiErr, "Failed to update filed in the DB")
return
}
aH.writeJSON(w, r, field)
}
func (aH *APIHandler) getLogs(w http.ResponseWriter, r *http.Request) {
params, err := logs.ParseLogFilterParams(r)
if err != nil {
apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err}
respondError(w, apiErr, "Incorrect params")
return
}
res, apiErr := (*aH.reader).GetLogs(r.Context(), params)
if apiErr != nil {
respondError(w, apiErr, "Failed to fetch logs from the DB")
return
}
aH.writeJSON(w, r, map[string]interface{}{"results": res})
}
func (aH *APIHandler) tailLogs(w http.ResponseWriter, r *http.Request) {
params, err := logs.ParseLogFilterParams(r)
if err != nil {
apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err}
respondError(w, apiErr, "Incorrect params")
return
}
// create the client
client := &model.LogsTailClient{Name: r.RemoteAddr, Logs: make(chan *model.GetLogsResponse, 1000), Done: make(chan *bool), Error: make(chan error), Filter: *params}
go (*aH.reader).TailLogs(r.Context(), client)
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.WriteHeader(200)
flusher, ok := w.(http.Flusher)
if !ok {
err := model.ApiError{Typ: model.ErrorStreamingNotSupported, Err: nil}
respondError(w, &err, "streaming is not supported")
return
}
for {
select {
case log := <-client.Logs:
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
enc.Encode(log)
fmt.Fprintf(w, "data: %v\n\n", buf.String())
flusher.Flush()
case <-client.Done:
zap.S().Debug("done!")
return
case err := <-client.Error:
zap.S().Error("error occured!", err)
return
}
}
}
func (aH *APIHandler) logAggregate(w http.ResponseWriter, r *http.Request) {
params, err := logs.ParseLogAggregateParams(r)
if err != nil {
apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err}
respondError(w, apiErr, "Incorrect params")
return
}
res, apiErr := (*aH.reader).AggregateLogs(r.Context(), params)
if apiErr != nil {
respondError(w, apiErr, "Failed to fetch logs aggregate from the DB")
return
}
aH.writeJSON(w, r, res)
}

View File

@ -0,0 +1,320 @@
package logs
import (
"fmt"
"net/http"
"regexp"
"strconv"
"strings"
"go.signoz.io/query-service/model"
)
var operatorMapping = map[string]string{
"lt": "<",
"gt": ">",
"lte": "<=",
"gte": ">=",
"in": "IN",
"nin": "NOT IN",
"contains": "ILIKE",
"ncontains": "NOT ILIKE",
}
const (
AND = "and"
OR = "or"
ORDER = "order"
ORDER_BY = "orderBy"
TIMESTAMP_START = "timestampStart"
TIMESTAMP_END = "timestampEnd"
IdGt = "idGt"
IdLT = "idLt"
TIMESTAMP = "timestamp"
ASC = "asc"
DESC = "desc"
)
var tokenRegex, _ = regexp.Compile(`(?i)(and( )*?|or( )*?)?(([\w.-]+ (in|nin) \([^(]+\))|([\w.]+ (gt|lt|gte|lte) (')?[\S]+(')?)|([\w.]+ (contains|ncontains)) '[^']+')`)
var operatorRegex, _ = regexp.Compile(`(?i)(?: )(in|nin|gt|lt|gte|lte|contains|ncontains)(?: )`)
func ParseLogFilterParams(r *http.Request) (*model.LogsFilterParams, error) {
res := model.LogsFilterParams{
Limit: 30,
OrderBy: "timestamp",
Order: "desc",
}
var err error
params := r.URL.Query()
if val, ok := params["limit"]; ok {
res.Limit, err = strconv.Atoi(val[0])
if err != nil {
return nil, err
}
}
if val, ok := params[ORDER_BY]; ok {
res.OrderBy = val[0]
}
if val, ok := params[ORDER]; ok {
res.Order = val[0]
}
if val, ok := params["q"]; ok {
res.Query = val[0]
}
if val, ok := params[TIMESTAMP_START]; ok {
ts, err := strconv.Atoi(val[0])
if err != nil {
return nil, err
}
res.TimestampStart = uint64(ts)
}
if val, ok := params[TIMESTAMP_END]; ok {
ts, err := strconv.Atoi(val[0])
if err != nil {
return nil, err
}
res.TimestampEnd = uint64(ts)
}
if val, ok := params[IdGt]; ok {
res.IdGt = val[0]
}
if val, ok := params[IdLT]; ok {
res.IdLT = val[0]
}
return &res, nil
}
func ParseLiveTailFilterParams(r *http.Request) (*model.LogsFilterParams, error) {
res := model.LogsFilterParams{}
params := r.URL.Query()
if val, ok := params["q"]; ok {
res.Query = val[0]
}
if val, ok := params[TIMESTAMP_START]; ok {
ts, err := strconv.Atoi(val[0])
if err != nil {
return nil, err
}
res.TimestampStart = uint64(ts)
}
if val, ok := params[IdGt]; ok {
res.IdGt = val[0]
}
return &res, nil
}
func ParseLogAggregateParams(r *http.Request) (*model.LogsAggregateParams, error) {
res := model.LogsAggregateParams{}
params := r.URL.Query()
if val, ok := params[TIMESTAMP_START]; ok {
ts, err := strconv.Atoi(val[0])
if err != nil {
return nil, err
}
res.TimestampStart = uint64(ts)
} else {
return nil, fmt.Errorf("timestampStart is required")
}
if val, ok := params[TIMESTAMP_END]; ok {
ts, err := strconv.Atoi(val[0])
if err != nil {
return nil, err
}
res.TimestampEnd = uint64(ts)
} else {
return nil, fmt.Errorf("timestampEnd is required")
}
if val, ok := params["q"]; ok {
res.Query = val[0]
}
if val, ok := params["groupBy"]; ok {
res.GroupBy = val[0]
}
if val, ok := params["function"]; ok {
res.Function = val[0]
}
if val, ok := params["step"]; ok {
step, err := strconv.Atoi(val[0])
if err != nil {
return nil, err
}
res.StepSeconds = step
} else {
return nil, fmt.Errorf("step is required")
}
return &res, nil
}
func parseLogQuery(query string) ([]string, error) {
sqlQueryTokens := []string{}
filterTokens := tokenRegex.FindAllString(query, -1)
if len(filterTokens) == 0 {
sqlQueryTokens = append(sqlQueryTokens, fmt.Sprintf("body ILIKE '%%%s%%' ", query))
return sqlQueryTokens, nil
}
// replace and check if there is something that is lying around
if len(strings.TrimSpace(tokenRegex.ReplaceAllString(query, ""))) > 0 {
return nil, fmt.Errorf("failed to parse query, contains unknown tokens")
}
for _, v := range filterTokens {
op := strings.TrimSpace(operatorRegex.FindString(v))
opLower := strings.ToLower(op)
if opLower == "contains" || opLower == "ncontains" {
searchString := strings.TrimSpace(strings.Split(v, op)[1])
operatorRemovedTokens := strings.Split(operatorRegex.ReplaceAllString(v, " "), " ")
searchCol := strings.ToLower(operatorRemovedTokens[0])
if searchCol == AND || searchCol == OR {
searchCol = strings.ToLower(operatorRemovedTokens[1])
}
col := searchCol
if strings.ToLower(searchCol) == "fulltext" {
col = "body"
}
f := fmt.Sprintf(`%s %s '%%%s%%' `, col, operatorMapping[opLower], searchString[1:len(searchString)-1])
if strings.HasPrefix(strings.ToLower(v), AND) {
f = "AND " + f
} else if strings.HasPrefix(strings.ToLower(v), OR) {
f = "OR " + f
}
sqlQueryTokens = append(sqlQueryTokens, f)
} else {
symbol := operatorMapping[strings.ToLower(op)]
sqlQueryTokens = append(sqlQueryTokens, strings.Replace(v, " "+op+" ", " "+symbol+" ", 1)+" ")
}
}
return sqlQueryTokens, nil
}
func parseColumn(s string) (*string, error) {
s = strings.ToLower(s)
colName := ""
// if has and/or as prefix
filter := strings.Split(s, " ")
if len(filter) < 3 {
return nil, fmt.Errorf("incorrect filter")
}
if strings.HasPrefix(s, AND) || strings.HasPrefix(s, OR) {
colName = filter[1]
} else {
colName = filter[0]
}
return &colName, nil
}
func arrayToMap(fields []model.LogField) map[string]model.LogField {
res := map[string]model.LogField{}
for _, field := range fields {
res[field.Name] = field
}
return res
}
func replaceInterestingFields(allFields *model.GetFieldsResponse, queryTokens []string) ([]string, error) {
// check if cols
selectedFieldsLookup := arrayToMap(allFields.Selected)
interestingFieldLookup := arrayToMap(allFields.Interesting)
for index := 0; index < len(queryTokens); index++ {
queryToken := queryTokens[index]
col, err := parseColumn(queryToken)
if err != nil {
return nil, err
}
sqlColName := *col
if _, ok := selectedFieldsLookup[*col]; !ok && *col != "body" {
if field, ok := interestingFieldLookup[*col]; ok {
sqlColName = fmt.Sprintf("%s_%s_value[indexOf(%s_%s_key, '%s')]", field.Type, strings.ToLower(field.DataType), field.Type, strings.ToLower(field.DataType), *col)
} else if strings.Compare(strings.ToLower(*col), "fulltext") != 0 {
return nil, fmt.Errorf("field not found for filtering")
}
}
queryTokens[index] = strings.Replace(queryToken, *col, sqlColName, 1)
}
return queryTokens, nil
}
func CheckIfPrevousPaginateAndModifyOrder(params *model.LogsFilterParams) (isPaginatePrevious bool) {
if params.IdGt != "" && params.OrderBy == TIMESTAMP && params.Order == DESC {
isPaginatePrevious = true
params.Order = ASC
} else if params.IdLT != "" && params.OrderBy == TIMESTAMP && params.Order == ASC {
isPaginatePrevious = true
params.Order = DESC
}
return
}
func GenerateSQLWhere(allFields *model.GetFieldsResponse, params *model.LogsFilterParams) (string, error) {
var tokens []string
var err error
var sqlWhere string
if params.Query != "" {
tokens, err = parseLogQuery(params.Query)
if err != nil {
return sqlWhere, err
}
}
tokens, err = replaceInterestingFields(allFields, tokens)
if err != nil {
return sqlWhere, err
}
filterTokens := []string{}
if params.TimestampStart != 0 {
filter := fmt.Sprintf("timestamp >= '%d' ", params.TimestampStart)
if len(filterTokens) > 0 {
filter = "and " + filter
}
filterTokens = append(filterTokens, filter)
}
if params.TimestampEnd != 0 {
filter := fmt.Sprintf("timestamp <= '%d' ", params.TimestampEnd)
if len(filterTokens) > 0 {
filter = "and " + filter
}
filterTokens = append(filterTokens, filter)
}
if params.IdGt != "" {
filter := fmt.Sprintf("id > '%v' ", params.IdGt)
if len(filterTokens) > 0 {
filter = "and " + filter
}
filterTokens = append(filterTokens, filter)
}
if params.IdLT != "" {
filter := fmt.Sprintf("id < '%v' ", params.IdLT)
if len(filterTokens) > 0 {
filter = "and " + filter
}
filterTokens = append(filterTokens, filter)
}
if len(filterTokens) > 0 {
if len(tokens) > 0 {
tokens[0] = fmt.Sprintf("and %s", tokens[0])
}
filterTokens = append(filterTokens, tokens...)
tokens = filterTokens
}
sqlWhere = strings.Join(tokens, "")
return sqlWhere, nil
}

View File

@ -0,0 +1,302 @@
package logs
import (
"testing"
. "github.com/smartystreets/goconvey/convey"
"go.signoz.io/query-service/model"
)
var correctQueriesTest = []struct {
Name string
InputQuery string
WantSqlTokens []string
}{
{
`filter with fulltext`,
`OPERATION in ('bcd') AND FULLTEXT contains 'searchstring'`,
[]string{`OPERATION IN ('bcd') `, `AND body ILIKE '%searchstring%' `},
},
{
`fulltext`,
`searchstring`,
[]string{`body ILIKE '%searchstring%' `},
},
{
`fulltext with quotes and space`,
`FULLTEXT contains 'Hello, "World"'`,
[]string{`body ILIKE '%Hello, "World"%' `},
},
{
`contains search with a different attributes`,
`resource contains 'Hello, "World"'`,
[]string{`resource ILIKE '%Hello, "World"%' `},
},
{
`more than one continas`,
`resource contains 'Hello, "World"' and myresource contains 'abcd'`,
[]string{`resource ILIKE '%Hello, "World"%' `, `AND myresource ILIKE '%abcd%' `},
},
{
"contains with or",
`id in ('2CkBCauK8m3nkyKR19YhCw6WbdY') or fulltext contains 'OPTIONS /api/v1/logs'`,
[]string{`id IN ('2CkBCauK8m3nkyKR19YhCw6WbdY') `, `OR body ILIKE '%OPTIONS /api/v1/logs%' `},
},
{
"mixing and or",
`id in ('2CkBCauK8m3nkyKR19YhCw6WbdY') and id in ('2CkBCauK8m3nkyKR19YhCw6WbdY','2CkBCauK8m3nkyKR19YhCw6WbdY') or fulltext contains 'OPTIONS /api/v1/logs'`,
[]string{`id IN ('2CkBCauK8m3nkyKR19YhCw6WbdY') `, `and id IN ('2CkBCauK8m3nkyKR19YhCw6WbdY','2CkBCauK8m3nkyKR19YhCw6WbdY') `, `OR body ILIKE '%OPTIONS /api/v1/logs%' `},
},
{
`filters with lt,gt,lte,gte operators`,
`id lt 100 and id gt 50 and code lte 500 and code gte 400`,
[]string{`id < 100 `, `and id > 50 `, `and code <= 500 `, `and code >= 400 `},
},
{
`filters with lt,gt,lte,gte operators seprated by OR`,
`id lt 100 or id gt 50 or code lte 500 or code gte 400`,
[]string{`id < 100 `, `or id > 50 `, `or code <= 500 `, `or code >= 400 `},
},
{
`filter with number`,
`status gte 200 AND FULLTEXT ncontains '"key"'`,
[]string{`status >= 200 `, `AND body NOT ILIKE '%"key"%' `},
},
{
`characters inside string`,
`service NIN ('name > 100') AND length gt 100`,
[]string{`service NOT IN ('name > 100') `, `AND length > 100 `},
},
{
`fulltext with in`,
`key in 2`,
[]string{`body ILIKE '%key in 2%' `},
},
{
`not valid fulltext but a filter`,
`key in (2,3)`,
[]string{`key IN (2,3) `},
},
{
`filters with extra spaces`,
`service IN ('name > 100') AND length gt 100`,
[]string{`service IN ('name > 100') `, `AND length > 100 `},
},
{
`filters with special characters in key name`,
`id.userid in (100) and id_userid gt 50`,
[]string{`id.userid IN (100) `, `and id_userid > 50 `},
},
}
func TestParseLogQueryCorrect(t *testing.T) {
for _, test := range correctQueriesTest {
Convey(test.Name, t, func() {
query, _ := parseLogQuery(test.InputQuery)
So(query, ShouldResemble, test.WantSqlTokens)
})
}
}
var incorrectQueries = []struct {
Name string
Query string
}{
{
"filter without a key",
"OPERATION in ('bcd') AND 'abcd' FULLTEXT contains 'helloxyz'",
},
{
"fulltext without fulltext keyword",
"OPERATION in ('bcd') AND 'searchstring'",
},
{
"fulltext in the beginning without keyword",
"'searchstring and OPERATION in ('bcd')",
},
}
func TestParseLogQueryInCorrect(t *testing.T) {
for _, test := range incorrectQueries {
Convey(test.Name, t, func() {
_, err := parseLogQuery(test.Query)
So(err, ShouldBeError)
})
}
}
var parseCorrectColumns = []struct {
Name string
Filter string
Column string
}{
{
"column with IN operator",
"id.userid IN (100) ",
"id.userid",
},
{
"column with NOT IN operator",
"service NOT IN ('name > 100') ",
"service",
},
{
"column with > operator",
"and id_userid > 50 ",
"id_userid",
},
{
"column with < operator",
"and id_userid < 50 ",
"id_userid",
},
{
"column with <= operator",
"and id_userid <= 50 ",
"id_userid",
},
{
"column with >= operator",
"and id_userid >= 50 ",
"id_userid",
},
{
"column with ilike",
`AND body ILIKE '%searchstring%' `,
"body",
},
{
"column with not ilike",
`AND body ILIKE '%searchstring%' `,
"body",
},
}
func TestParseColumn(t *testing.T) {
for _, test := range parseCorrectColumns {
Convey(test.Name, t, func() {
column, _ := parseColumn(test.Filter)
So(*column, ShouldEqual, test.Column)
})
}
}
func TestReplaceInterestingFields(t *testing.T) {
queryTokens := []string{"id.userid IN (100) ", "and id_key >= 50 ", `AND body ILIKE '%searchstring%'`}
allFields := model.GetFieldsResponse{
Selected: []model.LogField{
model.LogField{
Name: "id_key",
DataType: "int64",
Type: "attributes",
},
},
Interesting: []model.LogField{
model.LogField{
Name: "id.userid",
DataType: "int64",
Type: "attributes",
},
},
}
expectedTokens := []string{"attributes_int64_value[indexOf(attributes_int64_key, 'id.userid')] IN (100) ", "and id_key >= 50 ", `AND body ILIKE '%searchstring%'`}
Convey("testInterestingFields", t, func() {
tokens, _ := replaceInterestingFields(&allFields, queryTokens)
So(tokens, ShouldResemble, expectedTokens)
})
}
var previousPaginateTestCases = []struct {
Name string
Filter model.LogsFilterParams
IsPaginatePrev bool
Order string
}{
{
Name: "empty",
Filter: model.LogsFilterParams{},
IsPaginatePrev: false,
},
{
Name: "next ordery by asc",
Filter: model.LogsFilterParams{
OrderBy: TIMESTAMP,
Order: ASC,
IdGt: "myid",
},
IsPaginatePrev: false,
Order: ASC,
},
{
Name: "next ordery by desc",
Filter: model.LogsFilterParams{
OrderBy: TIMESTAMP,
Order: DESC,
IdLT: "myid",
},
IsPaginatePrev: false,
Order: DESC,
},
{
Name: "prev ordery by desc",
Filter: model.LogsFilterParams{
OrderBy: TIMESTAMP,
Order: DESC,
IdGt: "myid",
},
IsPaginatePrev: true,
Order: ASC,
},
{
Name: "prev ordery by asc",
Filter: model.LogsFilterParams{
OrderBy: TIMESTAMP,
Order: ASC,
IdLT: "myid",
},
IsPaginatePrev: true,
Order: DESC,
},
}
func TestCheckIfPrevousPaginateAndModifyOrder(t *testing.T) {
for _, test := range previousPaginateTestCases {
Convey(test.Name, t, func() {
isPrevPaginate := CheckIfPrevousPaginateAndModifyOrder(&test.Filter)
So(isPrevPaginate, ShouldEqual, test.IsPaginatePrev)
So(test.Order, ShouldEqual, test.Filter.Order)
})
}
}
func TestGenerateSQLQuery(t *testing.T) {
allFields := model.GetFieldsResponse{
Selected: []model.LogField{
{
Name: "id",
DataType: "int64",
Type: "attributes",
},
},
Interesting: []model.LogField{
{
Name: "code",
DataType: "int64",
Type: "attributes",
},
},
}
query := "id lt 100 and id gt 50 and code lte 500 and code gte 400"
tsStart := uint64(1657689292000)
tsEnd := uint64(1657689294000)
idStart := "2BsKLKv8cZrLCn6rkOcRGkdjBdM"
idEnd := "2BsKG6tRpFWjYMcWsAGKfSxoQdU"
sqlWhere := "timestamp >= '1657689292000' and timestamp <= '1657689294000' and id > '2BsKLKv8cZrLCn6rkOcRGkdjBdM' and id < '2BsKG6tRpFWjYMcWsAGKfSxoQdU' and id < 100 and id > 50 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] <= 500 and attributes_int64_value[indexOf(attributes_int64_key, 'code')] >= 400 "
Convey("testGenerateSQL", t, func() {
res, _ := GenerateSQLWhere(&allFields, &model.LogsFilterParams{Query: query, TimestampStart: tsStart, TimestampEnd: tsEnd, IdGt: idStart, IdLT: idEnd})
So(res, ShouldEqual, sqlWhere)
})
}

View File

@ -0,0 +1,40 @@
package logs
import (
"fmt"
"regexp"
"go.signoz.io/query-service/constants"
"go.signoz.io/query-service/model"
)
func ValidateUpdateFieldPayload(field *model.UpdateField) error {
if field.Name == "" {
return fmt.Errorf("name cannot be empty")
}
if field.Type == "" {
return fmt.Errorf("type cannot be empty")
}
if field.DataType == "" {
return fmt.Errorf("dataType cannot be empty")
}
matched, err := regexp.MatchString(fmt.Sprintf("^(%s|%s|%s)$", constants.Static, constants.Attributes, constants.Resources), field.Type)
if err != nil {
return err
}
if !matched {
return fmt.Errorf("type %s not supported", field.Type)
}
if field.IndexType != "" {
matched, err := regexp.MatchString(`^(minmax|set\([0-9]\)|bloom_filter\((0?.?[0-9]+|1)\)|tokenbf_v1\([0-9]+,[0-9]+,[0-9]+\)|ngrambf_v1\([0-9]+,[0-9]+,[0-9]+,[0-9]+\))$`, field.IndexType)
if err != nil {
return err
}
if !matched {
return fmt.Errorf("index type %s not supported", field.IndexType)
}
}
return nil
}

View File

@ -590,8 +590,8 @@ func parseTTLParams(r *http.Request) (*model.TTLParams, error) {
} }
// Validate the type parameter // Validate the type parameter
if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL { if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL && typeTTL != constants.LogsTTL {
return nil, fmt.Errorf("type param should be metrics|traces, got %v", typeTTL) return nil, fmt.Errorf("type param should be metrics|traces|logs, got %v", typeTTL)
} }
// Validate the TTL duration. // Validate the TTL duration.
@ -629,8 +629,8 @@ func parseGetTTL(r *http.Request) (*model.GetTTLParams, error) {
return nil, fmt.Errorf("type param cannot be empty from the query") return nil, fmt.Errorf("type param cannot be empty from the query")
} else { } else {
// Validate the type parameter // Validate the type parameter
if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL { if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL && typeTTL != constants.LogsTTL {
return nil, fmt.Errorf("type param should be metrics|traces, got %v", typeTTL) return nil, fmt.Errorf("type param should be metrics|traces|logs, got %v", typeTTL)
} }
} }

View File

@ -162,11 +162,12 @@ func (s *Server) createPublicServer(api *APIHandler) (*http.Server, error) {
api.RegisterRoutes(r) api.RegisterRoutes(r)
api.RegisterMetricsRoutes(r) api.RegisterMetricsRoutes(r)
api.RegisterLogsRoutes(r)
c := cors.New(cors.Options{ c := cors.New(cors.Options{
AllowedOrigins: []string{"*"}, AllowedOrigins: []string{"*"},
AllowedMethods: []string{"GET", "DELETE", "POST", "PUT", "PATCH"}, AllowedMethods: []string{"GET", "DELETE", "POST", "PUT", "PATCH", "OPTIONS"},
AllowedHeaders: []string{"Accept", "Authorization", "Content-Type"}, AllowedHeaders: []string{"Accept", "Authorization", "Content-Type", "cache-control"},
}) })
handler := c.Handler(r) handler := c.Handler(r)
@ -217,6 +218,11 @@ func (lrw *loggingResponseWriter) WriteHeader(code int) {
lrw.ResponseWriter.WriteHeader(code) lrw.ResponseWriter.WriteHeader(code)
} }
// Flush implements the http.Flush interface.
func (lrw *loggingResponseWriter) Flush() {
lrw.ResponseWriter.(http.Flusher).Flush()
}
func (s *Server) analyticsMiddleware(next http.Handler) http.Handler { func (s *Server) analyticsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
route := mux.CurrentRoute(r) route := mux.CurrentRoute(r)
@ -236,8 +242,14 @@ func (s *Server) analyticsMiddleware(next http.Handler) http.Handler {
func setTimeoutMiddleware(next http.Handler) http.Handler { func setTimeoutMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), constants.ContextTimeout*time.Second) ctx := r.Context()
defer cancel() var cancel context.CancelFunc
// check if route is not excluded
url := r.URL.Path
if _, ok := constants.TimeoutExcludedRoutes[url]; !ok {
ctx, cancel = context.WithTimeout(r.Context(), constants.ContextTimeout*time.Second)
defer cancel()
}
r = r.WithContext(ctx) r = r.WithContext(ctx)
next.ServeHTTP(w, r) next.ServeHTTP(w, r)

View File

@ -3,6 +3,8 @@ package constants
import ( import (
"os" "os"
"strconv" "strconv"
"go.signoz.io/query-service/model"
) )
const ( const (
@ -24,6 +26,7 @@ func IsTelemetryEnabled() bool {
const TraceTTL = "traces" const TraceTTL = "traces"
const MetricsTTL = "metrics" const MetricsTTL = "metrics"
const LogsTTL = "logs"
func GetAlertManagerApiPrefix() string { func GetAlertManagerApiPrefix() string {
if os.Getenv("ALERTMANAGER_API_PREFIX") != "" { if os.Getenv("ALERTMANAGER_API_PREFIX") != "" {
@ -38,35 +41,40 @@ var AmChannelApiPath = GetOrDefaultEnv("ALERTMANAGER_API_CHANNEL_PATH", "v1/rout
var RELATIONAL_DATASOURCE_PATH = GetOrDefaultEnv("SIGNOZ_LOCAL_DB_PATH", "/var/lib/signoz/signoz.db") var RELATIONAL_DATASOURCE_PATH = GetOrDefaultEnv("SIGNOZ_LOCAL_DB_PATH", "/var/lib/signoz/signoz.db")
const ( const (
ServiceName = "serviceName" ServiceName = "serviceName"
HttpRoute = "httpRoute" HttpRoute = "httpRoute"
HttpCode = "httpCode" HttpCode = "httpCode"
HttpHost = "httpHost" HttpHost = "httpHost"
HttpUrl = "httpUrl" HttpUrl = "httpUrl"
HttpMethod = "httpMethod" HttpMethod = "httpMethod"
Component = "component" Component = "component"
OperationDB = "name" OperationDB = "name"
OperationRequest = "operation" OperationRequest = "operation"
Status = "status" Status = "status"
Duration = "duration" Duration = "duration"
DBName = "dbName" DBName = "dbName"
DBOperation = "dbOperation" DBOperation = "dbOperation"
DBSystem = "dbSystem" DBSystem = "dbSystem"
MsgSystem = "msgSystem" MsgSystem = "msgSystem"
MsgOperation = "msgOperation" MsgOperation = "msgOperation"
Timestamp = "timestamp" Timestamp = "timestamp"
RPCMethod = "rpcMethod" RPCMethod = "rpcMethod"
ResponseStatusCode = "responseStatusCode" ResponseStatusCode = "responseStatusCode"
Descending = "descending" Descending = "descending"
Ascending = "ascending" Ascending = "ascending"
ContextTimeout = 60 // seconds ContextTimeout = 60 // seconds
StatusPending = "pending" StatusPending = "pending"
StatusFailed = "failed" StatusFailed = "failed"
StatusSuccess = "success" StatusSuccess = "success"
ExceptionType = "exceptionType" ExceptionType = "exceptionType"
ExceptionCount = "exceptionCount" ExceptionCount = "exceptionCount"
LastSeen = "lastSeen" LastSeen = "lastSeen"
FirstSeen = "firstSeen" FirstSeen = "firstSeen"
Attributes = "attributes"
Resources = "resources"
Static = "static"
DefaultLogSkipIndexType = "bloom_filter(0.01)"
DefaultLogSkipIndexGranularity = 64
) )
const ( const (
SIGNOZ_METRIC_DBNAME = "signoz_metrics" SIGNOZ_METRIC_DBNAME = "signoz_metrics"
@ -74,6 +82,10 @@ const (
SIGNOZ_TIMESERIES_TABLENAME = "time_series_v2" SIGNOZ_TIMESERIES_TABLENAME = "time_series_v2"
) )
var TimeoutExcludedRoutes = map[string]bool{
"/api/v1/logs/tail": true,
}
// alert related constants // alert related constants
const ( const (
// AlertHelpPage is used in case default alert repo url is not set // AlertHelpPage is used in case default alert repo url is not set
@ -87,3 +99,61 @@ func GetOrDefaultEnv(key string, fallback string) string {
} }
return v return v
} }
const (
STRING = "String"
UINT32 = "UInt32"
LOWCARDINALITY_STRING = "LowCardinality(String)"
INT32 = "Int32"
UINT8 = "Uint8"
)
var StaticInterestingLogFields = []model.LogField{
{
Name: "trace_id",
DataType: STRING,
Type: Static,
},
{
Name: "span_id",
DataType: STRING,
Type: Static,
},
{
Name: "trace_flags",
DataType: UINT32,
Type: Static,
},
{
Name: "severity_text",
DataType: LOWCARDINALITY_STRING,
Type: Static,
},
{
Name: "severity_number",
DataType: UINT8,
Type: Static,
},
}
var StaticSelectedLogFields = []model.LogField{
{
Name: "timestamp",
DataType: UINT32,
Type: Static,
},
{
Name: "id",
DataType: STRING,
Type: Static,
},
}
const (
LogsSQLSelect = "SELECT " +
"timestamp, id, trace_id, span_id, trace_flags, severity_text, severity_number, body," +
"CAST((attributes_string_key, attributes_string_value), 'Map(String, String)') as attributes_string," +
"CAST((attributes_int64_key, attributes_int64_value), 'Map(String, Int64)') as attributes_int64," +
"CAST((attributes_float64_key, attributes_float64_value), 'Map(String, Float64)') as attributes_float64," +
"CAST((resources_string_key, resources_string_value), 'Map(String, String)') as resources_string "
)

View File

@ -60,6 +60,13 @@ type Reader interface {
GetTimeSeriesInfo(ctx context.Context) (map[string]interface{}, error) GetTimeSeriesInfo(ctx context.Context) (map[string]interface{}, error)
GetSamplesInfoInLastHeartBeatInterval(ctx context.Context) (uint64, error) GetSamplesInfoInLastHeartBeatInterval(ctx context.Context) (uint64, error)
// Logs
GetLogFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError)
UpdateLogField(ctx context.Context, field *model.UpdateField) *model.ApiError
GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.GetLogsResponse, *model.ApiError)
TailLogs(ctx context.Context, client *model.LogsTailClient)
AggregateLogs(ctx context.Context, params *model.LogsAggregateParams) (*model.GetLogsAggregatesResponse, *model.ApiError)
// Connection needed for rules, not ideal but required // Connection needed for rules, not ideal but required
GetConn() clickhouse.Conn GetConn() clickhouse.Conn
} }

View File

@ -320,3 +320,32 @@ type FilterSet struct {
Operator string `json:"op,omitempty"` Operator string `json:"op,omitempty"`
Items []FilterItem `json:"items"` Items []FilterItem `json:"items"`
} }
type UpdateField struct {
Name string `json:"name"`
DataType string `json:"dataType"`
Type string `json:"type"`
Selected bool `json:"selected"`
IndexType string `json:"index"`
IndexGranularity int `json:"indexGranularity"`
}
type LogsFilterParams struct {
Limit int `json:"limit"`
OrderBy string `json:"orderBy"`
Order string `json:"order"`
Query string `json:"q"`
TimestampStart uint64 `json:"timestampStart"`
TimestampEnd uint64 `json:"timestampEnd"`
IdGt string `json:"idGt"`
IdLT string `json:"idLt"`
}
type LogsAggregateParams struct {
Query string `json:"q"`
TimestampStart uint64 `json:"timestampStart"`
TimestampEnd uint64 `json:"timestampEnd"`
GroupBy string `json:"groupBy"`
Function string `json:"function"`
StepSeconds int `json:"step"`
}

View File

@ -19,18 +19,19 @@ type ApiError struct {
type ErrorType string type ErrorType string
const ( const (
ErrorNone ErrorType = "" ErrorNone ErrorType = ""
ErrorTimeout ErrorType = "timeout" ErrorTimeout ErrorType = "timeout"
ErrorCanceled ErrorType = "canceled" ErrorCanceled ErrorType = "canceled"
ErrorExec ErrorType = "execution" ErrorExec ErrorType = "execution"
ErrorBadData ErrorType = "bad_data" ErrorBadData ErrorType = "bad_data"
ErrorInternal ErrorType = "internal" ErrorInternal ErrorType = "internal"
ErrorUnavailable ErrorType = "unavailable" ErrorUnavailable ErrorType = "unavailable"
ErrorNotFound ErrorType = "not_found" ErrorNotFound ErrorType = "not_found"
ErrorNotImplemented ErrorType = "not_implemented" ErrorNotImplemented ErrorType = "not_implemented"
ErrorUnauthorized ErrorType = "unauthorized" ErrorUnauthorized ErrorType = "unauthorized"
ErrorForbidden ErrorType = "forbidden" ErrorForbidden ErrorType = "forbidden"
ErrorConflict ErrorType = "conflict" ErrorConflict ErrorType = "conflict"
ErrorStreamingNotSupported ErrorType = "streaming is not supported"
) )
type QueryDataV2 struct { type QueryDataV2 struct {
@ -276,10 +277,14 @@ type GetTTLResponseItem struct {
MetricsMoveTime int `json:"metrics_move_ttl_duration_hrs,omitempty"` MetricsMoveTime int `json:"metrics_move_ttl_duration_hrs,omitempty"`
TracesTime int `json:"traces_ttl_duration_hrs,omitempty"` TracesTime int `json:"traces_ttl_duration_hrs,omitempty"`
TracesMoveTime int `json:"traces_move_ttl_duration_hrs,omitempty"` TracesMoveTime int `json:"traces_move_ttl_duration_hrs,omitempty"`
LogsTime int `json:"logs_ttl_duration_hrs,omitempty"`
LogsMoveTime int `json:"logs_move_ttl_duration_hrs,omitempty"`
ExpectedMetricsTime int `json:"expected_metrics_ttl_duration_hrs,omitempty"` ExpectedMetricsTime int `json:"expected_metrics_ttl_duration_hrs,omitempty"`
ExpectedMetricsMoveTime int `json:"expected_metrics_move_ttl_duration_hrs,omitempty"` ExpectedMetricsMoveTime int `json:"expected_metrics_move_ttl_duration_hrs,omitempty"`
ExpectedTracesTime int `json:"expected_traces_ttl_duration_hrs,omitempty"` ExpectedTracesTime int `json:"expected_traces_ttl_duration_hrs,omitempty"`
ExpectedTracesMoveTime int `json:"expected_traces_move_ttl_duration_hrs,omitempty"` ExpectedTracesMoveTime int `json:"expected_traces_move_ttl_duration_hrs,omitempty"`
ExpectedLogsTime int `json:"expected_logs_ttl_duration_hrs,omitempty"`
ExpectedLogsMoveTime int `json:"expected_logs_move_ttl_duration_hrs,omitempty"`
Status string `json:"status"` Status string `json:"status"`
} }
@ -407,6 +412,60 @@ func (p *MetricPoint) MarshalJSON() ([]byte, error) {
return json.Marshal([...]interface{}{float64(p.Timestamp) / 1000, v}) return json.Marshal([...]interface{}{float64(p.Timestamp) / 1000, v})
} }
type ShowCreateTableStatement struct {
Statement string `json:"statement" ch:"statement"`
}
type LogField struct {
Name string `json:"name" ch:"name"`
DataType string `json:"dataType" ch:"datatype"`
Type string `json:"type"`
}
type GetFieldsResponse struct {
Selected []LogField `json:"selected"`
Interesting []LogField `json:"interesting"`
}
type GetLogsResponse struct {
Timestamp uint64 `json:"timestamp" ch:"timestamp"`
ID string `json:"id" ch:"id"`
TraceID string `json:"traceId" ch:"trace_id"`
SpanID string `json:"spanId" ch:"span_id"`
TraceFlags uint32 `json:"traceFlags" ch:"trace_flags"`
SeverityText string `json:"severityText" ch:"severity_text"`
SeverityNumber uint8 `json:"severityNumber" ch:"severity_number"`
Body string `json:"body" ch:"body"`
Resources_string map[string]string `json:"resourcesString" ch:"resources_string"`
Attributes_string map[string]string `json:"attributesString" ch:"attributes_string"`
Attributes_int64 map[string]int64 `json:"attributesInt" ch:"attributes_int64"`
Attributes_float64 map[string]float64 `json:"attributesFloat" ch:"attributes_float64"`
}
type LogsTailClient struct {
Name string
Logs chan *GetLogsResponse
Done chan *bool
Error chan error
Filter LogsFilterParams
}
type GetLogsAggregatesResponse struct {
Items map[int64]LogsAggregatesResponseItem `json:"items"`
}
type LogsAggregatesResponseItem struct {
Timestamp int64 `json:"timestamp,omitempty" `
Value interface{} `json:"value,omitempty"`
GroupBy map[string]interface{} `json:"groupBy,omitempty"`
}
type LogsAggregatesDBResponseItem struct {
Timestamp int64 `ch:"time"`
Value float64 `ch:"value"`
GroupBy string `ch:"groupBy"`
}
// MarshalJSON implements json.Marshaler. // MarshalJSON implements json.Marshaler.
func (s *ServiceItem) MarshalJSON() ([]byte, error) { func (s *ServiceItem) MarshalJSON() ([]byte, error) {
// If a service didn't not send any data in the last interval duration // If a service didn't not send any data in the last interval duration