From b5498a373a4c05bb8ddd2f4617e3bb120fa87646 Mon Sep 17 00:00:00 2001 From: Han <109904848+wanghan5@users.noreply.github.com> Date: Wed, 9 Apr 2025 10:12:16 +0800 Subject: [PATCH] Accelerate migration (#17088) Co-authored-by: Wang Han --- api/services/plugin/data_migration.py | 52 ++++++++++++++++++--------- 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/api/services/plugin/data_migration.py b/api/services/plugin/data_migration.py index 7228a16632..597585588b 100644 --- a/api/services/plugin/data_migration.py +++ b/api/services/plugin/data_migration.py @@ -127,18 +127,32 @@ limit 1000""" processed_count = 0 failed_ids = [] + last_id = "00000000-0000-0000-0000-000000000000" + while True: - sql = f"""select id, {provider_column_name} as provider_name from {table_name} -where {provider_column_name} not like '%/%' and {provider_column_name} is not null and {provider_column_name} != '' -limit 1000""" + sql = f""" + SELECT id, {provider_column_name} AS provider_name + FROM {table_name} + WHERE {provider_column_name} NOT LIKE '%/%' + AND {provider_column_name} IS NOT NULL + AND {provider_column_name} != '' + AND id > :last_id + ORDER BY id ASC + LIMIT 5000 + """ + params = {"last_id": last_id or ""} + with db.engine.begin() as conn: - rs = conn.execute(db.text(sql)) + rs = conn.execute(db.text(sql), params) current_iter_count = 0 + batch_updates = [] + for i in rs: current_iter_count += 1 processed_count += 1 record_id = str(i.id) + last_id = record_id provider_name = str(i.provider_name) if record_id in failed_ids: @@ -152,19 +166,9 @@ limit 1000""" ) try: - # update provider name append with "langgenius/{provider_name}/{provider_name}" - sql = f"""update {table_name} - set {provider_column_name} = - concat('{DEFAULT_PLUGIN_ID}/', {provider_column_name}, '/', {provider_column_name}) - where id = :record_id""" - conn.execute(db.text(sql), {"record_id": record_id}) - click.echo( - click.style( - f"[{processed_count}] Migrated [{table_name}] {record_id} ({provider_name})", - fg="green", - ) - ) - except Exception: + updated_value = f"{DEFAULT_PLUGIN_ID}/{provider_name}/{provider_name}" + batch_updates.append((updated_value, record_id)) + except Exception as e: failed_ids.append(record_id) click.echo( click.style( @@ -177,6 +181,20 @@ limit 1000""" ) continue + if batch_updates: + update_sql = f""" + UPDATE {table_name} + SET {provider_column_name} = :updated_value + WHERE id = :record_id + """ + conn.execute(db.text(update_sql), [{"updated_value": u, "record_id": r} for u, r in batch_updates]) + click.echo( + click.style( + f"[{processed_count}] Batch migrated [{len(batch_updates)}] records from [{table_name}]", + fg="green", + ) + ) + if not current_iter_count: break