From 4ecf2a868553ba51f4e142fb3ba83c5abb07182d Mon Sep 17 00:00:00 2001 From: PVBLIC Foundation Date: Fri, 30 May 2025 09:33:57 -0700 Subject: [PATCH 1/2] Update pinecone.py May 2025 Latest Pinecone Best Practices --- .../retrieval/vector/dbs/pinecone.py | 54 ++++++++++++++++++- 1 file changed, 52 insertions(+), 2 deletions(-) diff --git a/backend/open_webui/retrieval/vector/dbs/pinecone.py b/backend/open_webui/retrieval/vector/dbs/pinecone.py index 9f8abf460..982258880 100644 --- a/backend/open_webui/retrieval/vector/dbs/pinecone.py +++ b/backend/open_webui/retrieval/vector/dbs/pinecone.py @@ -3,10 +3,18 @@ import logging import time # for measuring elapsed time from pinecone import Pinecone, ServerlessSpec +# Add gRPC support for better performance (Pinecone best practice) +try: + from pinecone.grpc import PineconeGRPC + GRPC_AVAILABLE = True +except ImportError: + GRPC_AVAILABLE = False + import asyncio # for async upserts import functools # for partial binding in async tasks import concurrent.futures # for parallel batch upserts +import random # for jitter in retry backoff from open_webui.retrieval.vector.main import ( VectorDBBase, @@ -47,7 +55,24 @@ class PineconeClient(VectorDBBase): self.cloud = PINECONE_CLOUD # Initialize Pinecone client for improved performance - self.client = Pinecone(api_key=self.api_key) + if GRPC_AVAILABLE: + # Use gRPC client for better performance (Pinecone recommendation) + self.client = PineconeGRPC( + api_key=self.api_key, + pool_threads=20, # Improved connection pool size + timeout=30 # Reasonable timeout for operations + ) + self.using_grpc = True + log.info("Using Pinecone gRPC client for optimal performance") + else: + # Fallback to HTTP client with enhanced connection pooling + self.client = Pinecone( + api_key=self.api_key, + pool_threads=20, # Improved connection pool size + timeout=30 # Reasonable timeout for operations + ) + self.using_grpc = False + log.info("Using Pinecone HTTP client (gRPC not available)") # Persistent executor for batch operations self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=5) @@ -91,12 +116,37 @@ class PineconeClient(VectorDBBase): log.info(f"Using existing Pinecone index '{self.index_name}'") # Connect to the index - self.index = self.client.Index(self.index_name) + self.index = self.client.Index( + self.index_name, + pool_threads=20, # Enhanced connection pool for index operations + ) except Exception as e: log.error(f"Failed to initialize Pinecone index: {e}") raise RuntimeError(f"Failed to initialize Pinecone index: {e}") + def _retry_pinecone_operation(self, operation_func, max_retries=3): + """Retry Pinecone operations with exponential backoff for rate limits and network issues.""" + for attempt in range(max_retries): + try: + return operation_func() + except Exception as e: + error_str = str(e).lower() + # Check if it's a retryable error (rate limits, network issues, timeouts) + is_retryable = any(keyword in error_str for keyword in [ + 'rate limit', 'quota', 'timeout', 'network', 'connection', + 'unavailable', 'internal error', '429', '500', '502', '503', '504' + ]) + + if not is_retryable or attempt == max_retries - 1: + # Don't retry for non-retryable errors or on final attempt + raise + + # Exponential backoff with jitter + delay = (2 ** attempt) + random.uniform(0, 1) + log.warning(f"Pinecone operation failed (attempt {attempt + 1}/{max_retries}), retrying in {delay:.2f}s: {e}") + time.sleep(delay) + def _create_points( self, items: List[VectorItem], collection_name_with_prefix: str ) -> List[Dict[str, Any]]: From 66bde326230aa1069a6a73b18e00862d5be4316e Mon Sep 17 00:00:00 2001 From: PVBLIC Foundation Date: Fri, 30 May 2025 18:47:23 -0700 Subject: [PATCH 2/2] Update pinecone.py --- .../retrieval/vector/dbs/pinecone.py | 55 +++++++++++++------ 1 file changed, 39 insertions(+), 16 deletions(-) diff --git a/backend/open_webui/retrieval/vector/dbs/pinecone.py b/backend/open_webui/retrieval/vector/dbs/pinecone.py index 982258880..8291332c0 100644 --- a/backend/open_webui/retrieval/vector/dbs/pinecone.py +++ b/backend/open_webui/retrieval/vector/dbs/pinecone.py @@ -6,6 +6,7 @@ from pinecone import Pinecone, ServerlessSpec # Add gRPC support for better performance (Pinecone best practice) try: from pinecone.grpc import PineconeGRPC + GRPC_AVAILABLE = True except ImportError: GRPC_AVAILABLE = False @@ -60,7 +61,7 @@ class PineconeClient(VectorDBBase): self.client = PineconeGRPC( api_key=self.api_key, pool_threads=20, # Improved connection pool size - timeout=30 # Reasonable timeout for operations + timeout=30, # Reasonable timeout for operations ) self.using_grpc = True log.info("Using Pinecone gRPC client for optimal performance") @@ -69,7 +70,7 @@ class PineconeClient(VectorDBBase): self.client = Pinecone( api_key=self.api_key, pool_threads=20, # Improved connection pool size - timeout=30 # Reasonable timeout for operations + timeout=30, # Reasonable timeout for operations ) self.using_grpc = False log.info("Using Pinecone HTTP client (gRPC not available)") @@ -133,18 +134,34 @@ class PineconeClient(VectorDBBase): except Exception as e: error_str = str(e).lower() # Check if it's a retryable error (rate limits, network issues, timeouts) - is_retryable = any(keyword in error_str for keyword in [ - 'rate limit', 'quota', 'timeout', 'network', 'connection', - 'unavailable', 'internal error', '429', '500', '502', '503', '504' - ]) - + is_retryable = any( + keyword in error_str + for keyword in [ + "rate limit", + "quota", + "timeout", + "network", + "connection", + "unavailable", + "internal error", + "429", + "500", + "502", + "503", + "504", + ] + ) + if not is_retryable or attempt == max_retries - 1: # Don't retry for non-retryable errors or on final attempt raise - + # Exponential backoff with jitter - delay = (2 ** attempt) + random.uniform(0, 1) - log.warning(f"Pinecone operation failed (attempt {attempt + 1}/{max_retries}), retrying in {delay:.2f}s: {e}") + delay = (2**attempt) + random.uniform(0, 1) + log.warning( + f"Pinecone operation failed (attempt {attempt + 1}/{max_retries}), " + f"retrying in {delay:.2f}s: {e}" + ) time.sleep(delay) def _create_points( @@ -273,7 +290,8 @@ class PineconeClient(VectorDBBase): elapsed = time.time() - start_time log.debug(f"Insert of {len(points)} vectors took {elapsed:.2f} seconds") log.info( - f"Successfully inserted {len(points)} vectors in parallel batches into '{collection_name_with_prefix}'" + f"Successfully inserted {len(points)} vectors in parallel batches " + f"into '{collection_name_with_prefix}'" ) def upsert(self, collection_name: str, items: List[VectorItem]) -> None: @@ -304,7 +322,8 @@ class PineconeClient(VectorDBBase): elapsed = time.time() - start_time log.debug(f"Upsert of {len(points)} vectors took {elapsed:.2f} seconds") log.info( - f"Successfully upserted {len(points)} vectors in parallel batches into '{collection_name_with_prefix}'" + f"Successfully upserted {len(points)} vectors in parallel batches " + f"into '{collection_name_with_prefix}'" ) async def insert_async(self, collection_name: str, items: List[VectorItem]) -> None: @@ -335,7 +354,8 @@ class PineconeClient(VectorDBBase): log.error(f"Error in async insert batch: {result}") raise result log.info( - f"Successfully async inserted {len(points)} vectors in batches into '{collection_name_with_prefix}'" + f"Successfully async inserted {len(points)} vectors in batches " + f"into '{collection_name_with_prefix}'" ) async def upsert_async(self, collection_name: str, items: List[VectorItem]) -> None: @@ -366,7 +386,8 @@ class PineconeClient(VectorDBBase): log.error(f"Error in async upsert batch: {result}") raise result log.info( - f"Successfully async upserted {len(points)} vectors in batches into '{collection_name_with_prefix}'" + f"Successfully async upserted {len(points)} vectors in batches " + f"into '{collection_name_with_prefix}'" ) def search( @@ -507,10 +528,12 @@ class PineconeClient(VectorDBBase): # This is a limitation of Pinecone - be careful with ID uniqueness self.index.delete(ids=batch_ids) log.debug( - f"Deleted batch of {len(batch_ids)} vectors by ID from '{collection_name_with_prefix}'" + f"Deleted batch of {len(batch_ids)} vectors by ID " + f"from '{collection_name_with_prefix}'" ) log.info( - f"Successfully deleted {len(ids)} vectors by ID from '{collection_name_with_prefix}'" + f"Successfully deleted {len(ids)} vectors by ID " + f"from '{collection_name_with_prefix}'" ) elif filter: