chore: add LogCommentEnricher middleware (#4681)

This commit is contained in:
Srikanth Chekuri 2024-03-12 18:39:28 +05:30 committed by GitHub
parent c9816cce18
commit adef0a4138
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 181 additions and 4 deletions

View File

@ -289,6 +289,7 @@ func (s *Server) createPrivateServer(apiHandler *api.APIHandler) (*http.Server,
r := mux.NewRouter()
r.Use(baseapp.LogCommentEnricher)
r.Use(setTimeoutMiddleware)
r.Use(s.analyticsMiddleware)
r.Use(loggingMiddlewarePrivate)
@ -321,6 +322,7 @@ func (s *Server) createPublicServer(apiHandler *api.APIHandler) (*http.Server, e
}
am := baseapp.NewAuthMiddleware(getUserFromRequest)
r.Use(baseapp.LogCommentEnricher)
r.Use(setTimeoutMiddleware)
r.Use(s.analyticsMiddleware)
r.Use(loggingMiddleware)

View File

@ -161,8 +161,10 @@ func NewReaderFromClickhouseConnection(
os.Exit(1)
}
wrap := clickhouseConnWrapper{conn: db}
return &ClickHouseReader{
db: db,
db: wrap,
localDB: localDB,
TraceDB: options.primary.TraceDB,
alertManager: alertManager,
@ -4679,10 +4681,26 @@ func readRowsForTimeSeriesResult(rows driver.Rows, vars []interface{}, columnNam
return seriesList, nil
}
func logComment(ctx context.Context) string {
// Get the key-value pairs from context for log comment
kv := ctx.Value("log_comment")
if kv == nil {
return ""
}
logCommentKVs, ok := kv.(map[string]string)
if !ok {
return ""
}
x, _ := json.Marshal(logCommentKVs)
return string(x)
}
// GetTimeSeriesResultV3 runs the query and returns list of time series
func (r *ClickHouseReader) GetTimeSeriesResultV3(ctx context.Context, query string) ([]*v3.Series, error) {
defer utils.Elapsed("GetTimeSeriesResultV3", query)()
defer utils.Elapsed("GetTimeSeriesResultV3", query, fmt.Sprintf("logComment: %s", logComment(ctx)))()
rows, err := r.db.Query(ctx, query)
@ -4707,7 +4725,7 @@ func (r *ClickHouseReader) GetTimeSeriesResultV3(ctx context.Context, query stri
// GetListResultV3 runs the query and returns list of rows
func (r *ClickHouseReader) GetListResultV3(ctx context.Context, query string) ([]*v3.Row, error) {
defer utils.Elapsed("GetListResultV3", query)()
defer utils.Elapsed("GetListResultV3", query, fmt.Sprintf("logComment: %s", logComment(ctx)))()
rows, err := r.db.Query(ctx, query)

View File

@ -0,0 +1,82 @@
package clickhouseReader
import (
"context"
"fmt"
"strings"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
)
type clickhouseConnWrapper struct {
conn clickhouse.Conn
}
func (c clickhouseConnWrapper) Close() error {
return c.conn.Close()
}
func (c clickhouseConnWrapper) Ping(ctx context.Context) error {
return c.conn.Ping(ctx)
}
func (c clickhouseConnWrapper) Stats() driver.Stats {
return c.conn.Stats()
}
func (c clickhouseConnWrapper) logComment(ctx context.Context) context.Context {
// Get the key-value pairs from context for log comment
kv := ctx.Value("log_comment")
if kv == nil {
return ctx
}
logCommentKVs, ok := kv.(map[string]string)
if !ok {
return ctx
}
logComment := ""
for k, v := range logCommentKVs {
logComment += fmt.Sprintf("%s=%s, ", k, v)
}
logComment = strings.TrimSuffix(logComment, ", ")
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{
"log_comment": logComment,
}))
return ctx
}
func (c clickhouseConnWrapper) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) {
return c.conn.Query(c.logComment(ctx), query, args...)
}
func (c clickhouseConnWrapper) QueryRow(ctx context.Context, query string, args ...interface{}) driver.Row {
return c.conn.QueryRow(c.logComment(ctx), query, args...)
}
func (c clickhouseConnWrapper) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
return c.conn.Select(c.logComment(ctx), dest, query, args...)
}
func (c clickhouseConnWrapper) Exec(ctx context.Context, query string, args ...interface{}) error {
return c.conn.Exec(c.logComment(ctx), query, args...)
}
func (c clickhouseConnWrapper) AsyncInsert(ctx context.Context, query string, wait bool, args ...interface{}) error {
return c.conn.AsyncInsert(c.logComment(ctx), query, wait, args...)
}
func (c clickhouseConnWrapper) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) {
return c.conn.PrepareBatch(c.logComment(ctx), query, opts...)
}
func (c clickhouseConnWrapper) ServerVersion() (*driver.ServerVersion, error) {
return c.conn.ServerVersion()
}
func (c clickhouseConnWrapper) Contributors() []string {
return c.conn.Contributors()
}

View File

@ -9,7 +9,9 @@ import (
"net"
"net/http"
_ "net/http/pprof" // http profiler
"net/url"
"os"
"strings"
"time"
"github.com/gorilla/handlers"
@ -266,6 +268,7 @@ func (s *Server) createPublicServer(api *APIHandler) (*http.Server, error) {
r := NewRouter()
r.Use(LogCommentEnricher)
r.Use(setTimeoutMiddleware)
r.Use(s.analyticsMiddleware)
r.Use(loggingMiddleware)
@ -305,6 +308,65 @@ func loggingMiddleware(next http.Handler) http.Handler {
})
}
func LogCommentEnricher(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
referrer := r.Header.Get("Referer")
var path, dashboardID, alertID, page, client, viewName, tab string
if referrer != "" {
referrerURL, _ := url.Parse(referrer)
client = "browser"
path = referrerURL.Path
if strings.Contains(path, "/dashboard") {
// Split the path into segments
pathSegments := strings.Split(referrerURL.Path, "/")
// The dashboard ID should be the segment after "/dashboard/"
// Loop through pathSegments to find "dashboard" and then take the next segment as the ID
for i, segment := range pathSegments {
if segment == "dashboard" && i < len(pathSegments)-1 {
// Return the next segment, which should be the dashboard ID
dashboardID = pathSegments[i+1]
}
}
page = "dashboards"
} else if strings.Contains(path, "/alerts") {
urlParams := referrerURL.Query()
alertID = urlParams.Get("ruleId")
page = "alerts"
} else if strings.Contains(path, "logs") && strings.Contains(path, "explorer") {
page = "logs-explorer"
viewName = referrerURL.Query().Get("viewName")
} else if strings.Contains(path, "/trace") || strings.Contains(path, "traces-explorer") {
page = "traces-explorer"
viewName = referrerURL.Query().Get("viewName")
} else if strings.Contains(path, "/services") {
page = "services"
tab = referrerURL.Query().Get("tab")
if tab == "" {
tab = "OVER_METRICS"
}
}
} else {
client = "api"
}
kvs := map[string]string{
"path": path,
"dashboardID": dashboardID,
"alertID": alertID,
"source": page,
"client": client,
"viewName": viewName,
"servicesTab": tab,
}
r = r.WithContext(context.WithValue(r.Context(), "log_comment", kvs))
next.ServeHTTP(w, r)
})
}
// loggingMiddlewarePrivate is used for logging private api calls
// from internal services like alert manager
func loggingMiddlewarePrivate(next http.Handler) http.Handler {

View File

@ -318,6 +318,13 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
rule.SetEvaluationTimestamp(t)
}(time.Now())
kvs := map[string]string{
"alertID": rule.ID(),
"source": "alerts",
"client": "query-service",
}
ctx = context.WithValue(ctx, "log_comment", kvs)
_, err := rule.Eval(ctx, ts, g.opts.Queriers)
if err != nil {
rule.SetHealth(HealthBad)

View File

@ -1,6 +1,7 @@
package utils
import (
"fmt"
"time"
"go.uber.org/zap"
@ -8,7 +9,12 @@ import (
func Elapsed(funcName string, args ...interface{}) func() {
start := time.Now()
argsStr := ""
for _, v := range args {
argsStr += fmt.Sprintf("%v, ", v)
}
argsStr = argsStr[:len(argsStr)-2]
return func() {
zap.S().Infof("func %s took %v with args %v", funcName, time.Since(start), args)
zap.S().Infof("func %s took %v with args %v", funcName, time.Since(start), string(argsStr))
}
}