diff --git a/apps/api/src/scraper/WebScraper/sitemap-index.ts b/apps/api/src/scraper/WebScraper/sitemap-index.ts index dec76b11..4a97f408 100644 --- a/apps/api/src/scraper/WebScraper/sitemap-index.ts +++ b/apps/api/src/scraper/WebScraper/sitemap-index.ts @@ -1,5 +1,5 @@ import { logger } from "../../lib/logger"; -import { normalizeUrlOnlyHostname } from "../../lib/canonical-url"; +import { normalizeUrl, normalizeUrlOnlyHostname } from "../../lib/canonical-url"; import { supabase_service } from "../../services/supabase"; /** @@ -23,7 +23,7 @@ async function querySitemapIndexFunction(url: string) { throw error; } - const allUrls = data.map((entry) => entry.urls).flat(); + const allUrls = [...new Set(data.map((entry) => entry.urls).flat().map(url => normalizeUrl(url)))]; return allUrls; } catch (error) { diff --git a/apps/api/src/services/indexing/crawl-maps-index.ts b/apps/api/src/services/indexing/crawl-maps-index.ts index 2687e814..fb6388f5 100644 --- a/apps/api/src/services/indexing/crawl-maps-index.ts +++ b/apps/api/src/services/indexing/crawl-maps-index.ts @@ -18,6 +18,15 @@ interface CrawlMapOperation { timestamp: string; } +interface CrawlMapRecord { + id?: string; + origin_url: string; + urls: string[]; + num_urls: number; + updated_at: string; + created_at?: string; +} + async function acquireLock(): Promise { const redis = redisConnection; // Set lock with NX (only if it doesn't exist) and PX (millisecond expiry) @@ -65,51 +74,83 @@ async function processBatch() { const origins = operations.map((op) => op.originUrl); const { data: existingMaps } = await supabase_service .from("crawl_maps") - .select("origin_url, urls") - .in("origin_url", origins); + .select("id, origin_url, urls, updated_at") + .in("origin_url", origins) + .order("updated_at", { ascending: false }); - const existingMapsByOrigin = new Map( - existingMaps?.map((map) => [map.origin_url, map.urls]) || [], - ); - - // Prepare updates and inserts - interface CrawlMapRecord { - origin_url: string; - urls: string[]; - num_urls: number; - updated_at: string; - created_at?: string; - } + // Group maps by origin and handle duplicates + const mapsByOrigin = new Map(); + existingMaps?.forEach((map) => { + const maps = mapsByOrigin.get(map.origin_url) || []; + maps.push(map); + mapsByOrigin.set(map.origin_url, maps); + }); + // Handle duplicates and prepare updates const updates: CrawlMapRecord[] = []; const inserts: CrawlMapRecord[] = []; + const duplicatesToDelete: string[] = []; for (const op of operations) { - const existingUrls = existingMapsByOrigin.get(op.originUrl); + const existingForOrigin = mapsByOrigin.get(op.originUrl) || []; - if (existingUrls) { - // Merge URLs for update + if (existingForOrigin.length > 0) { + // Keep most recent entry and mark others for deletion + const [mostRecent, ...duplicates] = existingForOrigin; + if (duplicates.length > 0) { + duplicatesToDelete.push(...duplicates.map(d => d.id)); + } + + // Merge and deduplicate URLs const mergedUrls = [ - ...new Set([...existingUrls, ...op.standardizedUrls]), + ...new Set([ + ...mostRecent.urls, + ...op.standardizedUrls.map(url => normalizeUrl(url)) + ]) ]; + updates.push({ + id: mostRecent.id, // Add id to ensure we update the correct record origin_url: op.originUrl, urls: mergedUrls, num_urls: mergedUrls.length, updated_at: op.timestamp, }); } else { - // Prepare insert + // Prepare insert with deduplicated URLs + const deduplicatedUrls = [...new Set(op.standardizedUrls.map(url => normalizeUrl(url)))]; inserts.push({ origin_url: op.originUrl, - urls: op.standardizedUrls, - num_urls: op.standardizedUrls.length, + urls: deduplicatedUrls, + num_urls: deduplicatedUrls.length, created_at: op.timestamp, updated_at: op.timestamp, }); } } + // Delete duplicate entries + if (duplicatesToDelete.length > 0) { + logger.info(`🗑️ Deleting ${duplicatesToDelete.length} duplicate crawl maps in batches of 100`); + + // Delete in batches of 100 + for (let i = 0; i < duplicatesToDelete.length; i += 100) { + const batch = duplicatesToDelete.slice(i, i + 100); + const { error: deleteError } = await supabase_service + .from("crawl_maps") + .delete() + .in("id", batch); + + if (deleteError) { + logger.error(`Failed to delete batch ${i/100 + 1} of duplicate crawl maps`, { + error: deleteError, + batchSize: batch.length, + startIndex: i + }); + } + } + } + // Execute batch operations if (updates.length > 0) { logger.info(`🔄 Updating ${updates.length} existing crawl maps`, {