live tail fetch only recent 100 logs every 10s

This commit is contained in:
nityanandagohain 2022-07-25 14:42:58 +05:30
parent 4825ed6e5f
commit 0fe4327877
2 changed files with 40 additions and 32 deletions

View File

@ -18,20 +18,21 @@ const (
)
const (
defaultDatasource string = "tcp://localhost:9000"
defaultTraceDB string = "signoz_traces"
defaultOperationsTable string = "signoz_operations"
defaultIndexTable string = "signoz_index_v2"
defaultErrorTable string = "signoz_error_index_v2"
defaulDurationTable string = "durationSortMV"
defaultSpansTable string = "signoz_spans"
defaultLogsDB string = "signoz_logs"
defaultLogsTable string = "logs"
defaultLogAttributeKeysTable string = "logs_atrribute_keys"
defaultLogResourceKeysTable string = "logs_resource_keys"
defaultWriteBatchDelay time.Duration = 5 * time.Second
defaultWriteBatchSize int = 10000
defaultEncoding Encoding = EncodingJSON
defaultDatasource string = "tcp://localhost:9000"
defaultTraceDB string = "signoz_traces"
defaultOperationsTable string = "signoz_operations"
defaultIndexTable string = "signoz_index_v2"
defaultErrorTable string = "signoz_error_index_v2"
defaulDurationTable string = "durationSortMV"
defaultSpansTable string = "signoz_spans"
defaultLogsDB string = "signoz_logs"
defaultLogsTable string = "logs"
defaultLogAttributeKeysTable string = "logs_atrribute_keys"
defaultLogResourceKeysTable string = "logs_resource_keys"
defaultLiveTailRefreshSeconds int = 10
defaultWriteBatchDelay time.Duration = 5 * time.Second
defaultWriteBatchSize int = 10000
defaultEncoding Encoding = EncodingJSON
)
const (
@ -60,6 +61,7 @@ type namespaceConfig struct {
LogsTable string
LogsAttributeKeysTable string
LogsResourceKeysTable string
LiveTailRefreshSeconds int
WriteBatchDelay time.Duration
WriteBatchSize int
Encoding Encoding
@ -123,6 +125,7 @@ func NewOptions(datasource string, primaryNamespace string, otherNamespaces ...s
LogsTable: defaultLogsTable,
LogsAttributeKeysTable: defaultLogAttributeKeysTable,
LogsResourceKeysTable: defaultLogResourceKeysTable,
LiveTailRefreshSeconds: defaultLiveTailRefreshSeconds,
WriteBatchDelay: defaultWriteBatchDelay,
WriteBatchSize: defaultWriteBatchSize,
Encoding: defaultEncoding,
@ -144,6 +147,7 @@ func NewOptions(datasource string, primaryNamespace string, otherNamespaces ...s
LogsTable: "",
LogsAttributeKeysTable: "",
LogsResourceKeysTable: "",
LiveTailRefreshSeconds: defaultLiveTailRefreshSeconds,
WriteBatchDelay: defaultWriteBatchDelay,
WriteBatchSize: defaultWriteBatchSize,
Encoding: defaultEncoding,

View File

@ -94,6 +94,8 @@ type ClickHouseReader struct {
promConfigFile string
promConfig *config.Config
alertManager am.Manager
liveTailRefreshSeconds int
}
// NewTraceReader returns a TraceReader for the database
@ -116,20 +118,21 @@ func NewReader(localDB *sqlx.DB, configFile string) *ClickHouseReader {
}
return &ClickHouseReader{
db: db,
localDB: localDB,
traceDB: options.primary.TraceDB,
alertManager: alertManager,
operationsTable: options.primary.OperationsTable,
indexTable: options.primary.IndexTable,
errorTable: options.primary.ErrorTable,
durationTable: options.primary.DurationTable,
spansTable: options.primary.SpansTable,
logsDB: options.primary.LogsDB,
logsTable: options.primary.LogsTable,
logsAttributeKeys: options.primary.LogsAttributeKeysTable,
logsResourceKeys: options.primary.LogsResourceKeysTable,
promConfigFile: configFile,
db: db,
localDB: localDB,
traceDB: options.primary.TraceDB,
alertManager: alertManager,
operationsTable: options.primary.OperationsTable,
indexTable: options.primary.IndexTable,
errorTable: options.primary.ErrorTable,
durationTable: options.primary.DurationTable,
spansTable: options.primary.SpansTable,
logsDB: options.primary.LogsDB,
logsTable: options.primary.LogsTable,
logsAttributeKeys: options.primary.LogsAttributeKeysTable,
logsResourceKeys: options.primary.LogsResourceKeysTable,
liveTailRefreshSeconds: options.primary.LiveTailRefreshSeconds,
promConfigFile: configFile,
}
}
@ -2912,6 +2915,7 @@ func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailC
zap.S().Debug("closing go routine : " + client.Name)
return
default:
// get the new 100 logs as anything more older won't make sense
tmpQuery := fmt.Sprintf("%s where timestamp >='%d'", query, tsStart)
if filterSql != "" {
tmpQuery += fmt.Sprintf(" and %s", filterSql)
@ -2919,7 +2923,7 @@ func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailC
if idStart != "" {
tmpQuery += fmt.Sprintf(" and id > '%s'", idStart)
}
tmpQuery = fmt.Sprintf("%s order by timestamp asc, id asc limit 1000", tmpQuery)
tmpQuery = fmt.Sprintf("%s order by timestamp desc, id desc limit 100", tmpQuery)
zap.S().Debug(tmpQuery)
response := []model.GetLogsResponse{}
err := r.db.Select(ctx, &response, tmpQuery)
@ -2929,7 +2933,7 @@ func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailC
return
}
len := len(response)
for i := 0; i < len; i++ {
for i := len - 1; i >= 0; i-- {
select {
case <-ctx.Done():
done := true
@ -2938,13 +2942,13 @@ func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailC
return
default:
client.Logs <- &response[i]
if i == len-1 {
if i == 0 {
tsStart = response[i].Timestamp
idStart = response[i].ID
}
}
}
time.Sleep(10 * time.Second)
time.Sleep(time.Duration(r.liveTailRefreshSeconds) * time.Second)
}
}
}