import logging import re from typing import Optional, List, Tuple, cast from langchain.document_loaders.base import BaseLoader from langchain.document_loaders.helpers import detect_file_encodings from langchain.schema import Document logger = logging.getLogger(__name__) class MarkdownLoader(BaseLoader): """Load md files. Args: file_path: Path to the file to load. remove_hyperlinks: Whether to remove hyperlinks from the text. remove_images: Whether to remove images from the text. encoding: File encoding to use. If `None`, the file will be loaded with the default system encoding. autodetect_encoding: Whether to try to autodetect the file encoding if the specified encoding fails. """ def __init__( self, file_path: str, remove_hyperlinks: bool = True, remove_images: bool = True, encoding: Optional[str] = None, autodetect_encoding: bool = True, ): """Initialize with file path.""" self._file_path = file_path self._remove_hyperlinks = remove_hyperlinks self._remove_images = remove_images self._encoding = encoding self._autodetect_encoding = autodetect_encoding def load(self) -> List[Document]: tups = self.parse_tups(self._file_path) documents = [] for header, value in tups: value = value.strip() if header is None: documents.append(Document(page_content=value)) else: documents.append(Document(page_content=f"\n\n{header}\n{value}")) return documents def markdown_to_tups(self, markdown_text: str) -> List[Tuple[Optional[str], str]]: """Convert a markdown file to a dictionary. The keys are the headers and the values are the text under each header. """ markdown_tups: List[Tuple[Optional[str], str]] = [] lines = markdown_text.split("\n") current_header = None current_text = "" for line in lines: header_match = re.match(r"^#+\s", line) if header_match: if current_header is not None: markdown_tups.append((current_header, current_text)) current_header = line current_text = "" else: current_text += line + "\n" markdown_tups.append((current_header, current_text)) if current_header is not None: # pass linting, assert keys are defined markdown_tups = [ (re.sub(r"#", "", cast(str, key)).strip(), re.sub(r"<.*?>", "", value)) for key, value in markdown_tups ] else: markdown_tups = [ (key, re.sub("\n", "", value)) for key, value in markdown_tups ] return markdown_tups def remove_images(self, content: str) -> str: """Get a dictionary of a markdown file from its path.""" pattern = r"!{1}\[\[(.*)\]\]" content = re.sub(pattern, "", content) return content def remove_hyperlinks(self, content: str) -> str: """Get a dictionary of a markdown file from its path.""" pattern = r"\[(.*?)\]\((.*?)\)" content = re.sub(pattern, r"\1", content) return content def parse_tups(self, filepath: str) -> List[Tuple[Optional[str], str]]: """Parse file into tuples.""" content = "" try: with open(filepath, "r", encoding=self._encoding) as f: content = f.read() except UnicodeDecodeError as e: if self._autodetect_encoding: detected_encodings = detect_file_encodings(filepath) for encoding in detected_encodings: logger.debug("Trying encoding: ", encoding.encoding) try: with open(filepath, encoding=encoding.encoding) as f: content = f.read() break except UnicodeDecodeError: continue else: raise RuntimeError(f"Error loading {filepath}") from e except Exception as e: raise RuntimeError(f"Error loading {filepath}") from e if self._remove_hyperlinks: content = self.remove_hyperlinks(content) if self._remove_images: content = self.remove_images(content) return self.markdown_to_tups(content)