diff --git a/backend/open_webui/retrieval/vector/dbs/pinecone.py b/backend/open_webui/retrieval/vector/dbs/pinecone.py index 982258880..8291332c0 100644 --- a/backend/open_webui/retrieval/vector/dbs/pinecone.py +++ b/backend/open_webui/retrieval/vector/dbs/pinecone.py @@ -6,6 +6,7 @@ from pinecone import Pinecone, ServerlessSpec # Add gRPC support for better performance (Pinecone best practice) try: from pinecone.grpc import PineconeGRPC + GRPC_AVAILABLE = True except ImportError: GRPC_AVAILABLE = False @@ -60,7 +61,7 @@ class PineconeClient(VectorDBBase): self.client = PineconeGRPC( api_key=self.api_key, pool_threads=20, # Improved connection pool size - timeout=30 # Reasonable timeout for operations + timeout=30, # Reasonable timeout for operations ) self.using_grpc = True log.info("Using Pinecone gRPC client for optimal performance") @@ -69,7 +70,7 @@ class PineconeClient(VectorDBBase): self.client = Pinecone( api_key=self.api_key, pool_threads=20, # Improved connection pool size - timeout=30 # Reasonable timeout for operations + timeout=30, # Reasonable timeout for operations ) self.using_grpc = False log.info("Using Pinecone HTTP client (gRPC not available)") @@ -133,18 +134,34 @@ class PineconeClient(VectorDBBase): except Exception as e: error_str = str(e).lower() # Check if it's a retryable error (rate limits, network issues, timeouts) - is_retryable = any(keyword in error_str for keyword in [ - 'rate limit', 'quota', 'timeout', 'network', 'connection', - 'unavailable', 'internal error', '429', '500', '502', '503', '504' - ]) - + is_retryable = any( + keyword in error_str + for keyword in [ + "rate limit", + "quota", + "timeout", + "network", + "connection", + "unavailable", + "internal error", + "429", + "500", + "502", + "503", + "504", + ] + ) + if not is_retryable or attempt == max_retries - 1: # Don't retry for non-retryable errors or on final attempt raise - + # Exponential backoff with jitter - delay = (2 ** attempt) + random.uniform(0, 1) - log.warning(f"Pinecone operation failed (attempt {attempt + 1}/{max_retries}), retrying in {delay:.2f}s: {e}") + delay = (2**attempt) + random.uniform(0, 1) + log.warning( + f"Pinecone operation failed (attempt {attempt + 1}/{max_retries}), " + f"retrying in {delay:.2f}s: {e}" + ) time.sleep(delay) def _create_points( @@ -273,7 +290,8 @@ class PineconeClient(VectorDBBase): elapsed = time.time() - start_time log.debug(f"Insert of {len(points)} vectors took {elapsed:.2f} seconds") log.info( - f"Successfully inserted {len(points)} vectors in parallel batches into '{collection_name_with_prefix}'" + f"Successfully inserted {len(points)} vectors in parallel batches " + f"into '{collection_name_with_prefix}'" ) def upsert(self, collection_name: str, items: List[VectorItem]) -> None: @@ -304,7 +322,8 @@ class PineconeClient(VectorDBBase): elapsed = time.time() - start_time log.debug(f"Upsert of {len(points)} vectors took {elapsed:.2f} seconds") log.info( - f"Successfully upserted {len(points)} vectors in parallel batches into '{collection_name_with_prefix}'" + f"Successfully upserted {len(points)} vectors in parallel batches " + f"into '{collection_name_with_prefix}'" ) async def insert_async(self, collection_name: str, items: List[VectorItem]) -> None: @@ -335,7 +354,8 @@ class PineconeClient(VectorDBBase): log.error(f"Error in async insert batch: {result}") raise result log.info( - f"Successfully async inserted {len(points)} vectors in batches into '{collection_name_with_prefix}'" + f"Successfully async inserted {len(points)} vectors in batches " + f"into '{collection_name_with_prefix}'" ) async def upsert_async(self, collection_name: str, items: List[VectorItem]) -> None: @@ -366,7 +386,8 @@ class PineconeClient(VectorDBBase): log.error(f"Error in async upsert batch: {result}") raise result log.info( - f"Successfully async upserted {len(points)} vectors in batches into '{collection_name_with_prefix}'" + f"Successfully async upserted {len(points)} vectors in batches " + f"into '{collection_name_with_prefix}'" ) def search( @@ -507,10 +528,12 @@ class PineconeClient(VectorDBBase): # This is a limitation of Pinecone - be careful with ID uniqueness self.index.delete(ids=batch_ids) log.debug( - f"Deleted batch of {len(batch_ids)} vectors by ID from '{collection_name_with_prefix}'" + f"Deleted batch of {len(batch_ids)} vectors by ID " + f"from '{collection_name_with_prefix}'" ) log.info( - f"Successfully deleted {len(ids)} vectors by ID from '{collection_name_with_prefix}'" + f"Successfully deleted {len(ids)} vectors by ID " + f"from '{collection_name_with_prefix}'" ) elif filter: