From 0fe4327877c3e1382074391562e49fa61a1552d8 Mon Sep 17 00:00:00 2001 From: nityanandagohain Date: Mon, 25 Jul 2022 14:42:58 +0530 Subject: [PATCH] live tail fetch only recent 100 logs every 10s --- .../app/clickhouseReader/options.go | 32 ++++++++------- .../app/clickhouseReader/reader.go | 40 ++++++++++--------- 2 files changed, 40 insertions(+), 32 deletions(-) diff --git a/pkg/query-service/app/clickhouseReader/options.go b/pkg/query-service/app/clickhouseReader/options.go index 930a5e9d70..aba35490cc 100644 --- a/pkg/query-service/app/clickhouseReader/options.go +++ b/pkg/query-service/app/clickhouseReader/options.go @@ -18,20 +18,21 @@ const ( ) const ( - defaultDatasource string = "tcp://localhost:9000" - defaultTraceDB string = "signoz_traces" - defaultOperationsTable string = "signoz_operations" - defaultIndexTable string = "signoz_index_v2" - defaultErrorTable string = "signoz_error_index_v2" - defaulDurationTable string = "durationSortMV" - defaultSpansTable string = "signoz_spans" - defaultLogsDB string = "signoz_logs" - defaultLogsTable string = "logs" - defaultLogAttributeKeysTable string = "logs_atrribute_keys" - defaultLogResourceKeysTable string = "logs_resource_keys" - defaultWriteBatchDelay time.Duration = 5 * time.Second - defaultWriteBatchSize int = 10000 - defaultEncoding Encoding = EncodingJSON + defaultDatasource string = "tcp://localhost:9000" + defaultTraceDB string = "signoz_traces" + defaultOperationsTable string = "signoz_operations" + defaultIndexTable string = "signoz_index_v2" + defaultErrorTable string = "signoz_error_index_v2" + defaulDurationTable string = "durationSortMV" + defaultSpansTable string = "signoz_spans" + defaultLogsDB string = "signoz_logs" + defaultLogsTable string = "logs" + defaultLogAttributeKeysTable string = "logs_atrribute_keys" + defaultLogResourceKeysTable string = "logs_resource_keys" + defaultLiveTailRefreshSeconds int = 10 + defaultWriteBatchDelay time.Duration = 5 * time.Second + defaultWriteBatchSize int = 10000 + defaultEncoding Encoding = EncodingJSON ) const ( @@ -60,6 +61,7 @@ type namespaceConfig struct { LogsTable string LogsAttributeKeysTable string LogsResourceKeysTable string + LiveTailRefreshSeconds int WriteBatchDelay time.Duration WriteBatchSize int Encoding Encoding @@ -123,6 +125,7 @@ func NewOptions(datasource string, primaryNamespace string, otherNamespaces ...s LogsTable: defaultLogsTable, LogsAttributeKeysTable: defaultLogAttributeKeysTable, LogsResourceKeysTable: defaultLogResourceKeysTable, + LiveTailRefreshSeconds: defaultLiveTailRefreshSeconds, WriteBatchDelay: defaultWriteBatchDelay, WriteBatchSize: defaultWriteBatchSize, Encoding: defaultEncoding, @@ -144,6 +147,7 @@ func NewOptions(datasource string, primaryNamespace string, otherNamespaces ...s LogsTable: "", LogsAttributeKeysTable: "", LogsResourceKeysTable: "", + LiveTailRefreshSeconds: defaultLiveTailRefreshSeconds, WriteBatchDelay: defaultWriteBatchDelay, WriteBatchSize: defaultWriteBatchSize, Encoding: defaultEncoding, diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 830db0043e..acaa52e5d4 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -94,6 +94,8 @@ type ClickHouseReader struct { promConfigFile string promConfig *config.Config alertManager am.Manager + + liveTailRefreshSeconds int } // NewTraceReader returns a TraceReader for the database @@ -116,20 +118,21 @@ func NewReader(localDB *sqlx.DB, configFile string) *ClickHouseReader { } return &ClickHouseReader{ - db: db, - localDB: localDB, - traceDB: options.primary.TraceDB, - alertManager: alertManager, - operationsTable: options.primary.OperationsTable, - indexTable: options.primary.IndexTable, - errorTable: options.primary.ErrorTable, - durationTable: options.primary.DurationTable, - spansTable: options.primary.SpansTable, - logsDB: options.primary.LogsDB, - logsTable: options.primary.LogsTable, - logsAttributeKeys: options.primary.LogsAttributeKeysTable, - logsResourceKeys: options.primary.LogsResourceKeysTable, - promConfigFile: configFile, + db: db, + localDB: localDB, + traceDB: options.primary.TraceDB, + alertManager: alertManager, + operationsTable: options.primary.OperationsTable, + indexTable: options.primary.IndexTable, + errorTable: options.primary.ErrorTable, + durationTable: options.primary.DurationTable, + spansTable: options.primary.SpansTable, + logsDB: options.primary.LogsDB, + logsTable: options.primary.LogsTable, + logsAttributeKeys: options.primary.LogsAttributeKeysTable, + logsResourceKeys: options.primary.LogsResourceKeysTable, + liveTailRefreshSeconds: options.primary.LiveTailRefreshSeconds, + promConfigFile: configFile, } } @@ -2912,6 +2915,7 @@ func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailC zap.S().Debug("closing go routine : " + client.Name) return default: + // get the new 100 logs as anything more older won't make sense tmpQuery := fmt.Sprintf("%s where timestamp >='%d'", query, tsStart) if filterSql != "" { tmpQuery += fmt.Sprintf(" and %s", filterSql) @@ -2919,7 +2923,7 @@ func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailC if idStart != "" { tmpQuery += fmt.Sprintf(" and id > '%s'", idStart) } - tmpQuery = fmt.Sprintf("%s order by timestamp asc, id asc limit 1000", tmpQuery) + tmpQuery = fmt.Sprintf("%s order by timestamp desc, id desc limit 100", tmpQuery) zap.S().Debug(tmpQuery) response := []model.GetLogsResponse{} err := r.db.Select(ctx, &response, tmpQuery) @@ -2929,7 +2933,7 @@ func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailC return } len := len(response) - for i := 0; i < len; i++ { + for i := len - 1; i >= 0; i-- { select { case <-ctx.Done(): done := true @@ -2938,13 +2942,13 @@ func (r *ClickHouseReader) TailLogs(ctx context.Context, client *model.LogsTailC return default: client.Logs <- &response[i] - if i == len-1 { + if i == 0 { tsStart = response[i].Timestamp idStart = response[i].ID } } } - time.Sleep(10 * time.Second) + time.Sleep(time.Duration(r.liveTailRefreshSeconds) * time.Second) } } }