From 35ea29b184ca4cf5b96a27429dfc62521873a2d3 Mon Sep 17 00:00:00 2001 From: Jan Kessler Date: Wed, 2 Apr 2025 21:50:00 +0200 Subject: [PATCH] prepare websocket redis sentinel code for upcoming native support of sentinel in python-socketio --- backend/open_webui/socket/main.py | 14 +----- backend/open_webui/utils/redis.py | 73 ++++--------------------------- 2 files changed, 11 insertions(+), 76 deletions(-) diff --git a/backend/open_webui/socket/main.py b/backend/open_webui/socket/main.py index 83dd74fff..c1ce42c79 100644 --- a/backend/open_webui/socket/main.py +++ b/backend/open_webui/socket/main.py @@ -9,9 +9,8 @@ from open_webui.models.users import Users, UserNameResponse from open_webui.models.channels import Channels from open_webui.models.chats import Chats from open_webui.utils.redis import ( - parse_redis_sentinel_url, get_sentinels_from_env, - AsyncRedisSentinelManager, + get_sentinel_url_from_env, ) from open_webui.env import ( @@ -38,16 +37,7 @@ log.setLevel(SRC_LOG_LEVELS["SOCKET"]) if WEBSOCKET_MANAGER == "redis": if WEBSOCKET_SENTINEL_HOSTS: - redis_config = parse_redis_sentinel_url(WEBSOCKET_REDIS_URL) - mgr = AsyncRedisSentinelManager( - WEBSOCKET_SENTINEL_HOSTS.split(","), - sentinel_port=int(WEBSOCKET_SENTINEL_PORT), - redis_port=redis_config["port"], - service=redis_config["service"], - db=redis_config["db"], - username=redis_config["username"], - password=redis_config["password"], - ) + mgr = socketio.AsyncRedisManager(get_sentinel_url_from_env(WEBSOCKET_REDIS_URL, WEBSOCKET_SENTINEL_HOSTS, WEBSOCKET_SENTINEL_PORT)) else: mgr = socketio.AsyncRedisManager(WEBSOCKET_REDIS_URL) sio = socketio.AsyncServer( diff --git a/backend/open_webui/utils/redis.py b/backend/open_webui/utils/redis.py index baccb16ad..715ac0d9b 100644 --- a/backend/open_webui/utils/redis.py +++ b/backend/open_webui/utils/redis.py @@ -4,7 +4,7 @@ from redis import asyncio as aioredis from urllib.parse import urlparse -def parse_redis_sentinel_url(redis_url): +def parse_redis_service_url(redis_url): parsed_url = urlparse(redis_url) if parsed_url.scheme != "redis": raise ValueError("Invalid Redis URL scheme. Must be 'redis'.") @@ -20,7 +20,7 @@ def parse_redis_sentinel_url(redis_url): def get_redis_connection(redis_url, redis_sentinels, decode_responses=True): if redis_sentinels: - redis_config = parse_redis_sentinel_url(redis_url) + redis_config = parse_redis_service_url(redis_url) sentinel = redis.sentinel.Sentinel( redis_sentinels, port=redis_config["port"], @@ -45,65 +45,10 @@ def get_sentinels_from_env(sentinel_hosts_env, sentinel_port_env): return [] -class AsyncRedisSentinelManager(socketio.AsyncRedisManager): - def __init__( - self, - sentinel_hosts, - sentinel_port=26379, - redis_port=6379, - service="mymaster", - db=0, - username=None, - password=None, - channel="socketio", - write_only=False, - logger=None, - redis_options=None, - ): - """ - Initialize the Redis Sentinel Manager. - This implementation mostly replicates the __init__ of AsyncRedisManager and - overrides _redis_connect() with a version that uses Redis Sentinel - - :param sentinel_hosts: List of Sentinel hosts - :param sentinel_port: Sentinel Port - :param redis_port: Redis Port (currently unsupported by aioredis!) - :param service: Master service name in Sentinel - :param db: Redis database to use - :param username: Redis username (if any) (currently unsupported by aioredis!) - :param password: Redis password (if any) - :param channel: The channel name on which the server sends and receives - notifications. Must be the same in all the servers. - :param write_only: If set to ``True``, only initialize to emit events. The - default of ``False`` initializes the class for emitting - and receiving. - :param redis_options: additional keyword arguments to be passed to - ``aioredis.from_url()``. - """ - self._sentinels = [(host, sentinel_port) for host in sentinel_hosts] - self._redis_port = redis_port - self._service = service - self._db = db - self._username = username - self._password = password - self._channel = channel - self.redis_options = redis_options or {} - - # connect and call grandparent constructor - self._redis_connect() - super(socketio.AsyncRedisManager, self).__init__( - channel=channel, write_only=write_only, logger=logger - ) - - def _redis_connect(self): - """Establish connections to Redis through Sentinel.""" - sentinel = aioredis.sentinel.Sentinel( - self._sentinels, - port=self._redis_port, - db=self._db, - password=self._password, - **self.redis_options, - ) - - self.redis = sentinel.master_for(self._service) - self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) +def get_sentinel_url_from_env(redis_url, sentinel_hosts_env, sentinel_port_env): + redis_config = parse_redis_service_url(redis_url) + username = redis_config["username"] or "" + password = redis_config["password"] or "" + auth_part = f"{username}:{password}" + hosts_part = ",".join(f"{host}:{sentinel_port_env}" for host in sentinel_hosts_env.split(",")) + return f"redis+sentinel://{auth_part}@{hosts_part}/{redis_config['db']}/{redis_config['service']}"