Fix get_unacked_iterator (#6280)

### What problem does this PR solve?

Fix get_unacked_iterator. Close #6132 

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
Zhichang Yu 2025-03-19 17:46:58 +08:00 committed by GitHub
parent 9cad60fa6d
commit bb869aca33
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 14 additions and 9 deletions

View File

@ -592,6 +592,7 @@ async def handle_task():
global DONE_TASKS, FAILED_TASKS
redis_msg, task = await collect()
if not task:
await trio.sleep(5)
return
try:
logging.info(f"handle_task begin for task {json.dumps(task)}")

View File

@ -209,7 +209,7 @@ class RedisDB:
"""https://redis.io/docs/latest/commands/xreadgroup/"""
try:
group_info = self.REDIS.xinfo_groups(queue_name)
if not any(e["name"] == group_name for e in group_info):
if not any(gi["name"] == group_name for gi in group_info):
self.REDIS.xgroup_create(queue_name, group_name, id="0", mkstream=True)
args = {
"groupname": group_name,
@ -228,7 +228,7 @@ class RedisDB:
res = RedisMsg(self.REDIS, queue_name, group_name, msg_id, payload)
return res
except Exception as e:
if "key" in str(e):
if str(e) == 'no such key':
pass
else:
logging.exception(
@ -242,8 +242,14 @@ class RedisDB:
def get_unacked_iterator(self, queue_names: list[str], group_name, consumer_name):
try:
for queue_name in queue_names:
group_info = self.REDIS.xinfo_groups(queue_name)
if not any(e["name"] == group_name for e in group_info):
try:
group_info = self.REDIS.xinfo_groups(queue_name)
except Exception as e:
if str(e) == 'no such key':
logging.warning(f"RedisDB.get_unacked_iterator queue {queue_name} doesn't exist")
continue
if not any(gi["name"] == group_name for gi in group_info):
logging.warning(f"RedisDB.get_unacked_iterator queue {queue_name} group {group_name} doesn't exist")
continue
current_min = 0
while True:
@ -251,13 +257,11 @@ class RedisDB:
if not payload:
break
current_min = payload.get_msg_id()
logging.info(f"RedisDB.get_unacked_iterator {consumer_name} msg_id {current_min}")
logging.info(f"RedisDB.get_unacked_iterator {queue_name} {consumer_name} {current_min}")
yield payload
except Exception as e:
if "key" in str(e):
return
except Exception:
logging.exception(
"RedisDB.get_unacked_iterator " + consumer_name + " got exception: "
"RedisDB.get_unacked_iterator got exception: "
)
self.__open__()