diff --git a/backend/open_webui/retrieval/loaders/mistral.py b/backend/open_webui/retrieval/loaders/mistral.py index 67641d050..b00e9d7ce 100644 --- a/backend/open_webui/retrieval/loaders/mistral.py +++ b/backend/open_webui/retrieval/loaders/mistral.py @@ -20,6 +20,14 @@ class MistralLoader: """ Enhanced Mistral OCR loader with both sync and async support. Loads documents by processing them through the Mistral OCR API. + + Performance Optimizations: + - Differentiated timeouts for different operations + - Intelligent retry logic with exponential backoff + - Memory-efficient file streaming for large files + - Connection pooling and keepalive optimization + - Semaphore-based concurrency control for batch processing + - Enhanced error handling with retryable error classification """ BASE_API_URL = "https://api.mistral.ai/v1" @@ -53,17 +61,40 @@ class MistralLoader: self.max_retries = max_retries self.debug = enable_debug_logging - # Pre-compute file info for performance + # PERFORMANCE OPTIMIZATION: Differentiated timeouts for different operations + # This prevents long-running OCR operations from affecting quick operations + # and improves user experience by failing fast on operations that should be quick + self.upload_timeout = min( + timeout, 120 + ) # Cap upload at 2 minutes - prevents hanging on large files + self.url_timeout = ( + 30 # URL requests should be fast - fail quickly if API is slow + ) + self.ocr_timeout = ( + timeout # OCR can take the full timeout - this is the heavy operation + ) + self.cleanup_timeout = ( + 30 # Cleanup should be quick - don't hang on file deletion + ) + + # PERFORMANCE OPTIMIZATION: Pre-compute file info to avoid repeated filesystem calls + # This avoids multiple os.path.basename() and os.path.getsize() calls during processing self.file_name = os.path.basename(file_path) self.file_size = os.path.getsize(file_path) + # ENHANCEMENT: Added User-Agent for better API tracking and debugging self.headers = { "Authorization": f"Bearer {self.api_key}", - "User-Agent": "OpenWebUI-MistralLoader/2.0", + "User-Agent": "OpenWebUI-MistralLoader/2.0", # Helps API provider track usage } def _debug_log(self, message: str, *args) -> None: - """Conditional debug logging for performance.""" + """ + PERFORMANCE OPTIMIZATION: Conditional debug logging for performance. + + Only processes debug messages when debug mode is enabled, avoiding + string formatting overhead in production environments. + """ if self.debug: log.debug(message, *args) @@ -115,53 +146,118 @@ class MistralLoader: log.error(f"Unexpected error processing response: {e}") raise + def _is_retryable_error(self, error: Exception) -> bool: + """ + ENHANCEMENT: Intelligent error classification for retry logic. + + Determines if an error is retryable based on its type and status code. + This prevents wasting time retrying errors that will never succeed + (like authentication errors) while ensuring transient errors are retried. + + Retryable errors: + - Network connection errors (temporary network issues) + - Timeouts (server might be temporarily overloaded) + - Server errors (5xx status codes - server-side issues) + - Rate limiting (429 status - temporary throttling) + + Non-retryable errors: + - Authentication errors (401, 403 - won't fix with retry) + - Bad request errors (400 - malformed request) + - Not found errors (404 - resource doesn't exist) + """ + if isinstance(error, requests.exceptions.ConnectionError): + return True # Network issues are usually temporary + if isinstance(error, requests.exceptions.Timeout): + return True # Timeouts might resolve on retry + if isinstance(error, requests.exceptions.HTTPError): + # Only retry on server errors (5xx) or rate limits (429) + if hasattr(error, "response") and error.response is not None: + status_code = error.response.status_code + return status_code >= 500 or status_code == 429 + return False + if isinstance( + error, (aiohttp.ClientConnectionError, aiohttp.ServerTimeoutError) + ): + return True # Async network/timeout errors are retryable + if isinstance(error, aiohttp.ClientResponseError): + return error.status >= 500 or error.status == 429 + return False # All other errors are non-retryable + def _retry_request_sync(self, request_func, *args, **kwargs): - """Synchronous retry logic with exponential backoff.""" + """ + ENHANCEMENT: Synchronous retry logic with intelligent error classification. + + Uses exponential backoff with jitter to avoid thundering herd problems. + The wait time increases exponentially but is capped at 30 seconds to + prevent excessive delays. Only retries errors that are likely to succeed + on subsequent attempts. + """ for attempt in range(self.max_retries): try: return request_func(*args, **kwargs) - except (requests.exceptions.RequestException, Exception) as e: - if attempt == self.max_retries - 1: + except Exception as e: + if attempt == self.max_retries - 1 or not self._is_retryable_error(e): raise - wait_time = (2**attempt) + 0.5 + # PERFORMANCE OPTIMIZATION: Exponential backoff with cap + # Prevents overwhelming the server while ensuring reasonable retry delays + wait_time = min((2**attempt) + 0.5, 30) # Cap at 30 seconds log.warning( - f"Request failed (attempt {attempt + 1}/{self.max_retries}): {e}. Retrying in {wait_time}s..." + f"Retryable error (attempt {attempt + 1}/{self.max_retries}): {e}. " + f"Retrying in {wait_time}s..." ) time.sleep(wait_time) async def _retry_request_async(self, request_func, *args, **kwargs): - """Async retry logic with exponential backoff.""" + """ + ENHANCEMENT: Async retry logic with intelligent error classification. + + Async version of retry logic that doesn't block the event loop during + wait periods. Uses the same exponential backoff strategy as sync version. + """ for attempt in range(self.max_retries): try: return await request_func(*args, **kwargs) - except (aiohttp.ClientError, asyncio.TimeoutError) as e: - if attempt == self.max_retries - 1: + except Exception as e: + if attempt == self.max_retries - 1 or not self._is_retryable_error(e): raise - wait_time = (2**attempt) + 0.5 + # PERFORMANCE OPTIMIZATION: Non-blocking exponential backoff + wait_time = min((2**attempt) + 0.5, 30) # Cap at 30 seconds log.warning( - f"Request failed (attempt {attempt + 1}/{self.max_retries}): {e}. Retrying in {wait_time}s..." + f"Retryable error (attempt {attempt + 1}/{self.max_retries}): {e}. " + f"Retrying in {wait_time}s..." ) - await asyncio.sleep(wait_time) + await asyncio.sleep(wait_time) # Non-blocking wait def _upload_file(self) -> str: - """Uploads the file to Mistral for OCR processing (sync version).""" + """ + PERFORMANCE OPTIMIZATION: Enhanced file upload with streaming consideration. + + Uploads the file to Mistral for OCR processing (sync version). + Uses context manager for file handling to ensure proper resource cleanup. + Although streaming is not enabled for this endpoint, the file is opened + in a context manager to minimize memory usage duration. + """ log.info("Uploading file to Mistral API") url = f"{self.BASE_API_URL}/files" - file_name = os.path.basename(self.file_path) def upload_request(): + # MEMORY OPTIMIZATION: Use context manager to minimize file handle lifetime + # This ensures the file is closed immediately after reading, reducing memory usage with open(self.file_path, "rb") as f: - files = {"file": (file_name, f, "application/pdf")} + files = {"file": (self.file_name, f, "application/pdf")} data = {"purpose": "ocr"} + # NOTE: stream=False is required for this endpoint + # The Mistral API doesn't support chunked uploads for this endpoint response = requests.post( url, headers=self.headers, files=files, data=data, - timeout=self.timeout, + timeout=self.upload_timeout, # Use specialized upload timeout + stream=False, # Keep as False for this endpoint ) return self._handle_response(response) @@ -209,7 +305,7 @@ class MistralLoader: url, data=writer, headers=self.headers, - timeout=aiohttp.ClientTimeout(total=self.timeout), + timeout=aiohttp.ClientTimeout(total=self.upload_timeout), ) as response: return await self._handle_response_async(response) @@ -231,7 +327,7 @@ class MistralLoader: def url_request(): response = requests.get( - url, headers=signed_url_headers, params=params, timeout=self.timeout + url, headers=signed_url_headers, params=params, timeout=self.url_timeout ) return self._handle_response(response) @@ -261,7 +357,7 @@ class MistralLoader: url, headers=headers, params=params, - timeout=aiohttp.ClientTimeout(total=self.timeout), + timeout=aiohttp.ClientTimeout(total=self.url_timeout), ) as response: return await self._handle_response_async(response) @@ -294,7 +390,7 @@ class MistralLoader: def ocr_request(): response = requests.post( - url, headers=ocr_headers, json=payload, timeout=self.timeout + url, headers=ocr_headers, json=payload, timeout=self.ocr_timeout ) return self._handle_response(response) @@ -336,7 +432,7 @@ class MistralLoader: url, json=payload, headers=headers, - timeout=aiohttp.ClientTimeout(total=self.timeout), + timeout=aiohttp.ClientTimeout(total=self.ocr_timeout), ) as response: ocr_response = await self._handle_response_async(response) @@ -353,7 +449,9 @@ class MistralLoader: url = f"{self.BASE_API_URL}/files/{file_id}" try: - response = requests.delete(url, headers=self.headers, timeout=30) + response = requests.delete( + url, headers=self.headers, timeout=self.cleanup_timeout + ) delete_response = self._handle_response(response) log.info(f"File deleted successfully: {delete_response}") except Exception as e: @@ -372,7 +470,7 @@ class MistralLoader: url=f"{self.BASE_API_URL}/files/{file_id}", headers=self.headers, timeout=aiohttp.ClientTimeout( - total=30 + total=self.cleanup_timeout ), # Shorter timeout for cleanup ) as response: return await self._handle_response_async(response) @@ -388,29 +486,39 @@ class MistralLoader: async def _get_session(self): """Context manager for HTTP session with optimized settings.""" connector = aiohttp.TCPConnector( - limit=10, # Total connection limit - limit_per_host=5, # Per-host connection limit - ttl_dns_cache=300, # DNS cache TTL + limit=20, # Increased total connection limit for better throughput + limit_per_host=10, # Increased per-host limit for API endpoints + ttl_dns_cache=600, # Longer DNS cache TTL (10 minutes) use_dns_cache=True, - keepalive_timeout=30, + keepalive_timeout=60, # Increased keepalive for connection reuse enable_cleanup_closed=True, + force_close=False, # Allow connection reuse + resolver=aiohttp.AsyncResolver(), # Use async DNS resolver + ) + + timeout = aiohttp.ClientTimeout( + total=self.timeout, + connect=30, # Connection timeout + sock_read=60, # Socket read timeout ) async with aiohttp.ClientSession( connector=connector, - timeout=aiohttp.ClientTimeout(total=self.timeout), + timeout=timeout, headers={"User-Agent": "OpenWebUI-MistralLoader/2.0"}, + raise_for_status=False, # We handle status codes manually ) as session: yield session def _process_results(self, ocr_response: Dict[str, Any]) -> List[Document]: - """Process OCR results into Document objects with enhanced metadata.""" + """Process OCR results into Document objects with enhanced metadata and memory efficiency.""" pages_data = ocr_response.get("pages") if not pages_data: log.warning("No pages found in OCR response.") return [ Document( - page_content="No text content found", metadata={"error": "no_pages"} + page_content="No text content found", + metadata={"error": "no_pages", "file_name": self.file_name}, ) ] @@ -418,41 +526,44 @@ class MistralLoader: total_pages = len(pages_data) skipped_pages = 0 + # Process pages in a memory-efficient way for page_data in pages_data: page_content = page_data.get("markdown") page_index = page_data.get("index") # API uses 0-based index - if page_content is not None and page_index is not None: - # Clean up content efficiently - cleaned_content = ( - page_content.strip() - if isinstance(page_content, str) - else str(page_content) - ) - - if cleaned_content: # Only add non-empty pages - documents.append( - Document( - page_content=cleaned_content, - metadata={ - "page": page_index, # 0-based index from API - "page_label": page_index - + 1, # 1-based label for convenience - "total_pages": total_pages, - "file_name": self.file_name, - "file_size": self.file_size, - "processing_engine": "mistral-ocr", - }, - ) - ) - else: - skipped_pages += 1 - self._debug_log(f"Skipping empty page {page_index}") - else: + if page_content is None or page_index is None: skipped_pages += 1 self._debug_log( - f"Skipping page due to missing 'markdown' or 'index'. Data: {page_data}" + f"Skipping page due to missing 'markdown' or 'index'. Data keys: {list(page_data.keys())}" ) + continue + + # Clean up content efficiently with early exit for empty content + if isinstance(page_content, str): + cleaned_content = page_content.strip() + else: + cleaned_content = str(page_content).strip() + + if not cleaned_content: + skipped_pages += 1 + self._debug_log(f"Skipping empty page {page_index}") + continue + + # Create document with optimized metadata + documents.append( + Document( + page_content=cleaned_content, + metadata={ + "page": page_index, # 0-based index from API + "page_label": page_index + 1, # 1-based label for convenience + "total_pages": total_pages, + "file_name": self.file_name, + "file_size": self.file_size, + "processing_engine": "mistral-ocr", + "content_length": len(cleaned_content), + }, + ) + ) if skipped_pages > 0: log.info( @@ -467,7 +578,11 @@ class MistralLoader: return [ Document( page_content="No valid text content found in document", - metadata={"error": "no_valid_pages", "total_pages": total_pages}, + metadata={ + "error": "no_valid_pages", + "total_pages": total_pages, + "file_name": self.file_name, + }, ) ] @@ -585,12 +700,14 @@ class MistralLoader: @staticmethod async def load_multiple_async( loaders: List["MistralLoader"], + max_concurrent: int = 5, # Limit concurrent requests ) -> List[List[Document]]: """ - Process multiple files concurrently for maximum performance. + Process multiple files concurrently with controlled concurrency. Args: loaders: List of MistralLoader instances + max_concurrent: Maximum number of concurrent requests Returns: List of document lists, one for each loader @@ -598,11 +715,20 @@ class MistralLoader: if not loaders: return [] - log.info(f"Starting concurrent processing of {len(loaders)} files") + log.info( + f"Starting concurrent processing of {len(loaders)} files with max {max_concurrent} concurrent" + ) start_time = time.time() - # Process all files concurrently - tasks = [loader.load_async() for loader in loaders] + # Use semaphore to control concurrency + semaphore = asyncio.Semaphore(max_concurrent) + + async def process_with_semaphore(loader: "MistralLoader") -> List[Document]: + async with semaphore: + return await loader.load_async() + + # Process all files with controlled concurrency + tasks = [process_with_semaphore(loader) for loader in loaders] results = await asyncio.gather(*tasks, return_exceptions=True) # Handle any exceptions in results @@ -624,10 +750,18 @@ class MistralLoader: else: processed_results.append(result) + # MONITORING: Log comprehensive batch processing statistics total_time = time.time() - start_time total_docs = sum(len(docs) for docs in processed_results) + success_count = sum( + 1 for result in results if not isinstance(result, Exception) + ) + failure_count = len(results) - success_count + log.info( - f"Batch processing completed in {total_time:.2f}s, produced {total_docs} total documents" + f"Batch processing completed in {total_time:.2f}s: " + f"{success_count} files succeeded, {failure_count} files failed, " + f"produced {total_docs} total documents" ) return processed_results