query-service: add cluster name cli flag (#3713)

* chore: query-service  add cluster name cli flag

* chore: add schema migrator to docker compose file

* chore: add schema migrator to docker swarm compose file

* chore: 📌 pin versions: Schema Migrator 0.79.10 and update compose files

* chore: 🔧 update compose depends_on for schema-migrator service

---------

Co-authored-by: Prashant Shahi <prashant@signoz.io>
This commit is contained in:
Dhawal Sanghvi 2023-10-20 12:37:45 +05:30 committed by GitHub
parent 63b503a9fb
commit ab42700245
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 114 additions and 34 deletions

View File

@ -33,12 +33,14 @@ x-clickhouse-defaults: &clickhouse-defaults
soft: 262144
hard: 262144
x-clickhouse-depend: &clickhouse-depend
x-db-depend: &db-depend
depends_on:
- clickhouse
- otel-collector-migrator
# - clickhouse-2
# - clickhouse-3
services:
zookeeper-1:
image: bitnami/zookeeper:3.7.1
@ -144,7 +146,7 @@ services:
condition: on-failure
query-service:
image: signoz/query-service:0.31.1
image: signoz/query-service:0.32.0
command:
[
"-config=/root/config/prometheus.yml",
@ -181,10 +183,10 @@ services:
deploy:
restart_policy:
condition: on-failure
<<: *clickhouse-depend
<<: *db-depend
frontend:
image: signoz/frontend:0.31.1
image: signoz/frontend:0.32.0
deploy:
restart_policy:
condition: on-failure
@ -197,7 +199,7 @@ services:
- ../common/nginx-config.conf:/etc/nginx/conf.d/default.conf
otel-collector:
image: signoz/signoz-otel-collector:0.79.9
image: signoz/signoz-otel-collector:0.79.10
command:
[
"--config=/etc/otel-collector-config.yaml",
@ -231,10 +233,24 @@ services:
condition: on-failure
depends_on:
- clickhouse
- otel-collector-migrator
- query-service
otel-collector-migrator:
image: signoz/signoz-schema-migrator:0.79.10
deploy:
restart_policy:
condition: on-failure
delay: 5s
command:
- "--dsn=tcp://clickhouse:9000"
depends_on:
- clickhouse
# - clickhouse-2
# - clickhouse-3
otel-collector-metrics:
image: signoz/signoz-otel-collector:0.79.9
image: signoz/signoz-otel-collector:0.79.10
command:
[
"--config=/etc/otel-collector-metrics-config.yaml",
@ -250,7 +266,7 @@ services:
deploy:
restart_policy:
condition: on-failure
<<: *clickhouse-depend
<<: *db-depend
logspout:
image: "gliderlabs/logspout:v3.2.14"

View File

@ -65,10 +65,23 @@ services:
- --queryService.url=http://query-service:8085
- --storage.path=/data
otel-collector-migrator:
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-0.79.10}
container_name: otel-migrator
command:
- "--dsn=tcp://clickhouse:9000"
depends_on:
clickhouse:
condition: service_healthy
# clickhouse-2:
# condition: service_healthy
# clickhouse-3:
# condition: service_healthy
# Notes for Maintainers/Contributors who will change Line Numbers of Frontend & Query-Section. Please Update Line Numbers in `./scripts/commentLinesForSetup.sh` & `./CONTRIBUTING.md`
otel-collector:
container_name: signoz-otel-collector
image: signoz/signoz-otel-collector:0.79.9
image: signoz/signoz-otel-collector:0.79.10
command:
[
"--config=/etc/otel-collector-config.yaml",
@ -98,12 +111,14 @@ services:
depends_on:
clickhouse:
condition: service_healthy
otel-collector-migrator:
condition: service_completed_successfully
query-service:
condition: service_healthy
otel-collector-metrics:
container_name: signoz-otel-collector-metrics
image: signoz/signoz-otel-collector:0.79.8
image: signoz/signoz-otel-collector:0.79.10
command:
[
"--config=/etc/otel-collector-metrics-config.yaml",
@ -120,6 +135,8 @@ services:
depends_on:
clickhouse:
condition: service_healthy
otel-collector-migrator:
condition: service_completed_successfully
logspout:
image: "gliderlabs/logspout:v3.2.14"

View File

@ -32,10 +32,12 @@ x-clickhouse-defaults: &clickhouse-defaults
soft: 262144
hard: 262144
x-clickhouse-depend: &clickhouse-depend
x-db-depend: &db-depend
depends_on:
clickhouse:
condition: service_healthy
otel-collector-migrator:
condition: service_completed_successfully
# clickhouse-2:
# condition: service_healthy
# clickhouse-3:
@ -198,7 +200,7 @@ services:
interval: 30s
timeout: 5s
retries: 3
<<: *clickhouse-depend
<<: *db-depend
frontend:
image: signoz/frontend:${DOCKER_TAG:-0.31.1}
@ -212,8 +214,22 @@ services:
volumes:
- ../common/nginx-config.conf:/etc/nginx/conf.d/default.conf
otel-collector-migrator:
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-0.79.10}
container_name: otel-migrator
command:
- "--dsn=tcp://clickhouse:9000"
depends_on:
clickhouse:
condition: service_healthy
# clickhouse-2:
# condition: service_healthy
# clickhouse-3:
# condition: service_healthy
otel-collector:
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-0.79.9}
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-0.79.10}
container_name: signoz-otel-collector
command:
[
@ -247,11 +263,13 @@ services:
depends_on:
clickhouse:
condition: service_healthy
otel-collector-migrator:
condition: service_completed_successfully
query-service:
condition: service_healthy
otel-collector-metrics:
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-0.79.8}
image: signoz/signoz-otel-collector:${OTELCOL_TAG:-0.79.10}
container_name: signoz-otel-collector-metrics
command:
[
@ -266,7 +284,7 @@ services:
# - "13133:13133" # Health check extension
# - "55679:55679" # zPages extension
restart: on-failure
<<: *clickhouse-depend
<<: *db-depend
logspout:
image: "gliderlabs/logspout:v3.2.14"

View File

@ -24,8 +24,9 @@ func NewDataConnector(
maxIdleConns int,
maxOpenConns int,
dialTimeout time.Duration,
cluster string,
) *ClickhouseReader {
ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout)
ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster)
return &ClickhouseReader{
conn: ch.GetConn(),
appdb: localDB,

View File

@ -67,6 +67,7 @@ type ServerOptions struct {
DialTimeout time.Duration
CacheConfigPath string
FluxInterval string
Cluster string
}
// Server runs HTTP api service
@ -139,6 +140,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.MaxIdleConns,
serverOptions.MaxOpenConns,
serverOptions.DialTimeout,
serverOptions.Cluster,
)
go qb.Start(readerReady)
reader = qb

View File

@ -81,6 +81,7 @@ func main() {
// the url used to build link in the alert messages in slack and other systems
var ruleRepoURL string
var cluster string
var cacheConfigPath, fluxInterval string
var enableQueryServiceLogOTLPExport bool
@ -103,6 +104,7 @@ func main() {
flag.StringVar(&cacheConfigPath, "experimental.cache-config", "", "(cache config to use)")
flag.StringVar(&fluxInterval, "flux-interval", "5m", "(cache config to use)")
flag.BoolVar(&enableQueryServiceLogOTLPExport, "enable.query.service.log.otlp.export", false, "(enable query service log otlp export)")
flag.StringVar(&cluster, "cluster", "cluster", "(cluster name - defaults to 'cluster')")
flag.Parse()
@ -128,6 +130,7 @@ func main() {
DialTimeout: dialTimeout,
CacheConfigPath: cacheConfigPath,
FluxInterval: fluxInterval,
Cluster: cluster,
}
// Read the jwt secret key

View File

@ -55,7 +55,6 @@ import (
)
const (
cluster = "cluster"
primaryNamespace = "clickhouse"
archiveNamespace = "clickhouse-archive"
signozTraceDBName = "signoz_traces"
@ -116,6 +115,7 @@ type ClickHouseReader struct {
featureFlags interfaces.FeatureLookup
liveTailRefreshSeconds int
cluster string
}
// NewTraceReader returns a TraceReader for the database
@ -126,6 +126,7 @@ func NewReader(
maxIdleConns int,
maxOpenConns int,
dialTimeout time.Duration,
cluster string,
) *ClickHouseReader {
datasource := os.Getenv("ClickHouseUrl")
@ -168,6 +169,7 @@ func NewReader(
liveTailRefreshSeconds: options.primary.LiveTailRefreshSeconds,
promConfigFile: configFile,
featureFlags: featureFlag,
cluster: cluster,
}
}
@ -2287,7 +2289,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
}
req := fmt.Sprintf(
"ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(timestamp) + INTERVAL %v SECOND DELETE",
tableName, cluster, params.DelDuration)
tableName, r.cluster, params.DelDuration)
if len(params.ColdStorageVolume) > 0 {
req += fmt.Sprintf(", toDateTime(timestamp) + INTERVAL %v SECOND TO VOLUME '%s'",
params.ToColdStorageDuration, params.ColdStorageVolume)
@ -2342,7 +2344,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
}
req := fmt.Sprintf(
"ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(toUInt32(timestamp_ms / 1000), 'UTC') + "+
"INTERVAL %v SECOND DELETE", tableName, cluster, params.DelDuration)
"INTERVAL %v SECOND DELETE", tableName, r.cluster, params.DelDuration)
if len(params.ColdStorageVolume) > 0 {
req += fmt.Sprintf(", toDateTime(toUInt32(timestamp_ms / 1000), 'UTC')"+
" + INTERVAL %v SECOND TO VOLUME '%s'",
@ -2396,7 +2398,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
}
req := fmt.Sprintf(
"ALTER TABLE %v ON CLUSTER %s MODIFY TTL toDateTime(timestamp / 1000000000) + "+
"INTERVAL %v SECOND DELETE", tableName, cluster, params.DelDuration)
"INTERVAL %v SECOND DELETE", tableName, r.cluster, params.DelDuration)
if len(params.ColdStorageVolume) > 0 {
req += fmt.Sprintf(", toDateTime(timestamp / 1000000000)"+
" + INTERVAL %v SECOND TO VOLUME '%s'",
@ -2502,7 +2504,7 @@ func (r *ClickHouseReader) setColdStorage(ctx context.Context, tableName string,
// Set the storage policy for the required table. If it is already set, then setting it again
// will not a problem.
if len(coldStorageVolume) > 0 {
policyReq := fmt.Sprintf("ALTER TABLE %s ON CLUSTER %s MODIFY SETTING storage_policy='tiered'", tableName, cluster)
policyReq := fmt.Sprintf("ALTER TABLE %s ON CLUSTER %s MODIFY SETTING storage_policy='tiered'", tableName, r.cluster)
zap.S().Debugf("Executing Storage policy request: %s\n", policyReq)
if err := r.db.Exec(ctx, policyReq); err != nil {
@ -3480,7 +3482,7 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda
// create materialized column
query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s %s MATERIALIZED %s[indexOf(%s, '%s')] CODEC(ZSTD(1))",
r.logsDB, r.logsLocalTable,
cluster,
r.cluster,
colname, field.DataType,
valueColName,
keyColName,
@ -3493,7 +3495,7 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda
query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s %s MATERIALIZED -1",
r.logsDB, r.logsTable,
cluster,
r.cluster,
colname, field.DataType,
)
err = r.db.Exec(ctx, query)
@ -3504,7 +3506,7 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda
// create exists column
query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s_exists bool MATERIALIZED if(indexOf(%s, '%s') != 0, true, false) CODEC(ZSTD(1))",
r.logsDB, r.logsLocalTable,
cluster,
r.cluster,
colname,
keyColName,
field.Name,
@ -3516,7 +3518,7 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda
query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS %s_exists bool MATERIALIZED false",
r.logsDB, r.logsTable,
cluster,
r.cluster,
colname,
)
err = r.db.Exec(ctx, query)
@ -3533,7 +3535,7 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda
}
query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS %s_idx (%s) TYPE %s GRANULARITY %d",
r.logsDB, r.logsLocalTable,
cluster,
r.cluster,
colname,
colname,
field.IndexType,
@ -3546,7 +3548,7 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda
} else {
// Delete the index first
query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s DROP INDEX IF EXISTS %s_idx", r.logsDB, r.logsLocalTable, cluster, colname)
query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s DROP INDEX IF EXISTS %s_idx", r.logsDB, r.logsLocalTable, r.cluster, colname)
err := r.db.Exec(ctx, query)
if err != nil {
return &model.ApiError{Err: err, Typ: model.ErrorInternal}
@ -3557,7 +3559,7 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda
query := "ALTER TABLE %s.%s ON CLUSTER %s DROP COLUMN IF EXISTS %s "
err := r.db.Exec(ctx, fmt.Sprintf(query,
r.logsDB, table,
cluster,
r.cluster,
colname,
),
)
@ -3569,7 +3571,7 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda
query = "ALTER TABLE %s.%s ON CLUSTER %s DROP COLUMN IF EXISTS %s_exists "
err = r.db.Exec(ctx, fmt.Sprintf(query,
r.logsDB, table,
cluster,
r.cluster,
colname,
),
)

View File

@ -58,6 +58,7 @@ type ServerOptions struct {
DialTimeout time.Duration
CacheConfigPath string
FluxInterval string
Cluster string
}
// Server runs HTTP, Mux and a grpc server
@ -119,6 +120,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
serverOptions.MaxIdleConns,
serverOptions.MaxOpenConns,
serverOptions.DialTimeout,
serverOptions.Cluster,
)
go clickhouseReader.Start(readerReady)
reader = clickhouseReader

View File

@ -34,6 +34,7 @@ func main() {
// the url used to build link in the alert messages in slack and other systems
var ruleRepoURL, cacheConfigPath, fluxInterval string
var cluster string
var preferDelta bool
var preferSpanMetrics bool
@ -53,6 +54,7 @@ func main() {
flag.StringVar(&ruleRepoURL, "rules.repo-url", constants.AlertHelpPage, "(host address used to build rule link in alert messages)")
flag.StringVar(&cacheConfigPath, "experimental.cache-config", "", "(cache config to use)")
flag.StringVar(&fluxInterval, "flux-interval", "5m", "(cache config to use)")
flag.StringVar(&cluster, "cluster", "cluster", "(cluster name - defaults to 'cluster')")
flag.Parse()
loggerMgr := initZapLog()
@ -76,6 +78,7 @@ func main() {
DialTimeout: dialTimeout,
CacheConfigPath: cacheConfigPath,
FluxInterval: fluxInterval,
Cluster: cluster,
}
// Read the jwt secret key

View File

@ -31,10 +31,12 @@ x-clickhouse-defaults: &clickhouse-defaults
soft: 262144
hard: 262144
x-clickhouse-depends: &clickhouse-depends
x-db-depend: &db-depend
depends_on:
clickhouse:
condition: service_healthy
otel-collector-migrator:
condition: service_completed_successfully
# clickhouse-2:
# condition: service_healthy
# clickhouse-3:
@ -187,10 +189,23 @@ services:
interval: 30s
timeout: 5s
retries: 3
<<: *clickhouse-depends
<<: *db-depend
otel-collector-migrator:
image: signoz/signoz-schema-migrator:${OTELCOL_TAG:-0.79.10}
container_name: otel-migrator
command:
- "--dsn=tcp://clickhouse:9000"
depends_on:
clickhouse:
condition: service_healthy
# clickhouse-2:
# condition: service_healthy
# clickhouse-3:
# condition: service_healthy
otel-collector:
image: signoz/signoz-otel-collector:0.79.9
image: signoz/signoz-otel-collector:0.79.10
container_name: signoz-otel-collector
command:
[
@ -224,12 +239,13 @@ services:
depends_on:
clickhouse:
condition: service_healthy
otel-collector-migrator:
condition: service_completed_successfully
query-service:
condition: service_healthy
otel-collector-metrics:
image: signoz/signoz-otel-collector:0.79.8
image: signoz/signoz-otel-collector:0.79.10
container_name: signoz-otel-collector-metrics
command:
[
@ -244,7 +260,7 @@ services:
# - "13133:13133" # Health check extension
# - "55679:55679" # zPages extension
restart: on-failure
<<: *clickhouse-depends
<<: *db-depend
logspout:
image: "gliderlabs/logspout:v3.2.14"