diff --git a/api/tasks/add_document_to_index_task.py b/api/tasks/add_document_to_index_task.py index bd7fcdadea..c5a5ddaadc 100644 --- a/api/tasks/add_document_to_index_task.py +++ b/api/tasks/add_document_to_index_task.py @@ -21,7 +21,7 @@ def add_document_to_index_task(dataset_document_id: str): Async Add document to index :param dataset_document_id: - Usage: add_document_to_index.delay(dataset_document_id) + Usage: add_document_to_index_task.delay(dataset_document_id) """ logging.info(click.style("Start add document to index: {}".format(dataset_document_id), fg="green")) start_at = time.perf_counter() diff --git a/api/tasks/batch_clean_document_task.py b/api/tasks/batch_clean_document_task.py index 3bae82a5e3..8376ab1b03 100644 --- a/api/tasks/batch_clean_document_task.py +++ b/api/tasks/batch_clean_document_task.py @@ -21,7 +21,7 @@ def batch_clean_document_task(document_ids: list[str], dataset_id: str, doc_form :param doc_form: doc_form :param file_ids: file ids - Usage: clean_document_task.delay(document_id, dataset_id) + Usage: batch_clean_document_task.delay(document_ids, dataset_id) """ logging.info(click.style("Start batch clean documents when documents deleted", fg="green")) start_at = time.perf_counter() diff --git a/api/tasks/batch_create_segment_to_index_task.py b/api/tasks/batch_create_segment_to_index_task.py index 40356b9731..648f92b0f8 100644 --- a/api/tasks/batch_create_segment_to_index_task.py +++ b/api/tasks/batch_create_segment_to_index_task.py @@ -35,7 +35,7 @@ def batch_create_segment_to_index_task( :param tenant_id: :param user_id: - Usage: batch_create_segment_to_index_task.delay(segment_id) + Usage: batch_create_segment_to_index_task.delay(job_id, content, dataset_id, document_id, tenant_id, user_id) """ logging.info(click.style("Start batch create segment jobId: {}".format(job_id), fg="green")) start_at = time.perf_counter() diff --git a/api/tasks/delete_segment_from_index_task.py b/api/tasks/delete_segment_from_index_task.py index 3b04143dd9..e4fbd5465e 100644 --- a/api/tasks/delete_segment_from_index_task.py +++ b/api/tasks/delete_segment_from_index_task.py @@ -17,7 +17,7 @@ def delete_segment_from_index_task(index_node_ids: list, dataset_id: str, docume :param dataset_id: :param document_id: - Usage: delete_segment_from_index_task.delay(segment_ids) + Usage: delete_segment_from_index_task.delay(index_node_ids, dataset_id, document_id) """ logging.info(click.style("Start delete segment from index", fg="green")) start_at = time.perf_counter() diff --git a/api/tasks/disable_segments_from_index_task.py b/api/tasks/disable_segments_from_index_task.py index 67112666e7..d43fb90ed3 100644 --- a/api/tasks/disable_segments_from_index_task.py +++ b/api/tasks/disable_segments_from_index_task.py @@ -15,7 +15,9 @@ from models.dataset import Document as DatasetDocument def disable_segments_from_index_task(segment_ids: list, dataset_id: str, document_id: str): """ Async disable segments from index - :param segment_ids: + :param segment_ids: list of segment ids + :param dataset_id: dataset id + :param document_id: document id Usage: disable_segments_from_index_task.delay(segment_ids, dataset_id, document_id) """ diff --git a/api/tasks/document_indexing_task.py b/api/tasks/document_indexing_task.py index 21b571b6cb..a8e3a69f19 100644 --- a/api/tasks/document_indexing_task.py +++ b/api/tasks/document_indexing_task.py @@ -19,7 +19,7 @@ def document_indexing_task(dataset_id: str, document_ids: list): :param dataset_id: :param document_ids: - Usage: document_indexing_task.delay(dataset_id, document_id) + Usage: document_indexing_task.delay(dataset_id, document_ids) """ documents = [] start_at = time.perf_counter() diff --git a/api/tasks/duplicate_document_indexing_task.py b/api/tasks/duplicate_document_indexing_task.py index 8e1d2b6b5d..8cf34fb66a 100644 --- a/api/tasks/duplicate_document_indexing_task.py +++ b/api/tasks/duplicate_document_indexing_task.py @@ -20,7 +20,7 @@ def duplicate_document_indexing_task(dataset_id: str, document_ids: list): :param dataset_id: :param document_ids: - Usage: duplicate_document_indexing_task.delay(dataset_id, document_id) + Usage: duplicate_document_indexing_task.delay(dataset_id, document_ids) """ documents = [] start_at = time.perf_counter() diff --git a/api/tasks/enable_segments_to_index_task.py b/api/tasks/enable_segments_to_index_task.py index 0864e05e25..9fb1e3416a 100644 --- a/api/tasks/enable_segments_to_index_task.py +++ b/api/tasks/enable_segments_to_index_task.py @@ -18,9 +18,11 @@ from models.dataset import Document as DatasetDocument def enable_segments_to_index_task(segment_ids: list, dataset_id: str, document_id: str): """ Async enable segments to index - :param segment_ids: + :param segment_ids: list of segment ids + :param dataset_id: dataset id + :param document_id: document id - Usage: enable_segments_to_index_task.delay(segment_ids) + Usage: enable_segments_to_index_task.delay(segment_ids, dataset_id, document_id) """ start_at = time.perf_counter() dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() diff --git a/api/tasks/external_document_indexing_task.py b/api/tasks/external_document_indexing_task.py deleted file mode 100644 index a45b3030bf..0000000000 --- a/api/tasks/external_document_indexing_task.py +++ /dev/null @@ -1,91 +0,0 @@ -import json -import logging -import time - -import click -from celery import shared_task # type: ignore - -from core.indexing_runner import DocumentIsPausedError -from extensions.ext_database import db -from extensions.ext_storage import storage -from models.dataset import Dataset, ExternalKnowledgeApis -from models.model import UploadFile -from services.external_knowledge_service import ExternalDatasetService - - -@shared_task(queue="dataset") -def external_document_indexing_task( - dataset_id: str, external_knowledge_api_id: str, data_source: dict, process_parameter: dict -): - """ - Async process document - :param dataset_id: - :param external_knowledge_api_id: - :param data_source: - :param process_parameter: - Usage: external_document_indexing_task.delay(dataset_id, document_id) - """ - start_at = time.perf_counter() - - dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() - if not dataset: - logging.info( - click.style("Processed external dataset: {} failed, dataset not exit.".format(dataset_id), fg="red") - ) - return - - # get external api template - external_knowledge_api = ( - db.session.query(ExternalKnowledgeApis) - .filter( - ExternalKnowledgeApis.id == external_knowledge_api_id, ExternalKnowledgeApis.tenant_id == dataset.tenant_id - ) - .first() - ) - - if not external_knowledge_api: - logging.info( - click.style( - "Processed external dataset: {} failed, api template: {} not exit.".format( - dataset_id, external_knowledge_api_id - ), - fg="red", - ) - ) - return - files = {} - if data_source["type"] == "upload_file": - upload_file_list = data_source["info_list"]["file_info_list"]["file_ids"] - for file_id in upload_file_list: - file = ( - db.session.query(UploadFile) - .filter(UploadFile.tenant_id == dataset.tenant_id, UploadFile.id == file_id) - .first() - ) - if file: - files[file.id] = (file.name, storage.load_once(file.key), file.mime_type) - try: - settings = ExternalDatasetService.get_external_knowledge_api_settings( - json.loads(external_knowledge_api.settings) - ) - - # do http request - response = ExternalDatasetService.process_external_api(settings, files) - job_id = response.json().get("job_id") - if job_id: - # save job_id to dataset - dataset.job_id = job_id - db.session.commit() - - end_at = time.perf_counter() - logging.info( - click.style( - "Processed external dataset: {} successful, latency: {}".format(dataset.id, end_at - start_at), - fg="green", - ) - ) - except DocumentIsPausedError as ex: - logging.info(click.style(str(ex), fg="yellow")) - - except Exception: - pass diff --git a/api/tasks/retry_document_indexing_task.py b/api/tasks/retry_document_indexing_task.py index 74fd542f6c..2f4f14b21f 100644 --- a/api/tasks/retry_document_indexing_task.py +++ b/api/tasks/retry_document_indexing_task.py @@ -20,7 +20,7 @@ def retry_document_indexing_task(dataset_id: str, document_ids: list[str]): :param dataset_id: :param document_ids: - Usage: retry_document_indexing_task.delay(dataset_id, document_id) + Usage: retry_document_indexing_task.delay(dataset_id, document_ids) """ documents: list[Document] = [] start_at = time.perf_counter()