diff --git a/pageindex/parser/markdown.py b/pageindex/parser/markdown.py index f62013c4c..dc2fbed50 100644 --- a/pageindex/parser/markdown.py +++ b/pageindex/parser/markdown.py @@ -1,8 +1,21 @@ import re from pathlib import Path +import yaml + from .protocol import ContentNode, ParsedDocument from ..index.utils import count_tokens +# Patterns +_ATX_HEADER = re.compile(r"^(#{1,6})\s+(.+?)(?:\s+#+\s*)?$") +_SETEXT_H1 = re.compile(r"^=+\s*$") +_SETEXT_H2 = re.compile(r"^-+\s*$") +_FENCE_OPEN = re.compile(r"^(`{3,}|~{3,})") +_FENCE_CLOSE = re.compile(r"^(`{3,}|~{3,})\s*$") +_FRONTMATTER_FENCE = re.compile(r"^---\s*$") +_BLOCKQUOTE = re.compile(r"^>") +_LIST_ITEM = re.compile(r"^(?:[-+*]|\d{1,9}[.)])(?:\s+|$)") +_TABLE_ROW = re.compile(r"^\|.*\|$|^[^|]+\|[^|]+$") + class MarkdownParser: def supported_extensions(self) -> list[str]: @@ -16,44 +29,192 @@ def parse(self, file_path: str, **kwargs) -> ParsedDocument: content = f.read() lines = content.split("\n") + lines, metadata, line_offset = self._strip_frontmatter(lines) headers = self._extract_headers(lines) - nodes = self._build_nodes(headers, lines, model) + nodes = self._build_nodes(headers, lines, model, line_offset) + + return ParsedDocument( + doc_name=path.stem, + nodes=nodes, + metadata=metadata, + ) + + @staticmethod + def _strip_frontmatter(lines: list[str]) -> tuple[list[str], dict | None, int]: + """Strip YAML frontmatter (--- delimited) from the beginning of the file. + + Returns the remaining lines, raw frontmatter metadata, and removed line count. + """ + if not lines or not _FRONTMATTER_FENCE.match(lines[0]): + return lines, None, 0 + + for i in range(1, len(lines)): + if _FRONTMATTER_FENCE.match(lines[i]): + raw = "\n".join(lines[1:i]) + try: + parsed = yaml.safe_load(raw) + except yaml.YAMLError: + return lines, None, 0 + if not isinstance(parsed, dict): + return lines, None, 0 + + remaining = lines[i + 1:] + return remaining, {"frontmatter": raw}, i + 1 - return ParsedDocument(doc_name=path.stem, nodes=nodes) + # No closing fence found — not valid frontmatter, return as-is + return lines, None, 0 def _extract_headers(self, lines: list[str]) -> list[dict]: - header_pattern = r"^(#{1,6})\s+(.+)$" - code_block_pattern = r"^```" + """Extract all ATX and setext headers, respecting fenced code blocks.""" headers = [] - in_code_block = False + in_fence = False + fence_pattern = None # tracks the char and min length to close for line_num, line in enumerate(lines, 1): - stripped = line.strip() - if re.match(code_block_pattern, stripped): - in_code_block = not in_code_block + indent = self._indent_width(line) + content = line.lstrip(" \t") + stripped = content.strip() + + # Track fenced code blocks + fence_match = _FENCE_OPEN.match(content) if indent <= 3 else None + if fence_match: + marker = fence_match.group(1) + if not in_fence: + in_fence = True + fence_char = marker[0] + fence_len = len(marker) + fence_pattern = (fence_char, fence_len) + elif self._is_closing_fence(stripped, fence_pattern): + # Only close if same char and at least as many + in_fence = False + fence_pattern = None + continue + + if in_fence: + continue + + # Four-space/tab indented lines are code blocks, not headers. + if indent >= 4: + continue + + # ATX headers: # Title + atx = _ATX_HEADER.match(stripped) + if atx: + headers.append({ + "title": atx.group(2).strip(), + "level": len(atx.group(1)), + "line_num": line_num, + }) continue - if not in_code_block and stripped: - match = re.match(header_pattern, stripped) - if match: - headers.append({ - "title": match.group(2).strip(), - "level": len(match.group(1)), - "line_num": line_num, - }) + + # Setext headers: underline on next line detected by looking back + # We check if *this* line is an underline and the previous line is text + if line_num >= 2: + prev_line = lines[line_num - 2] + prev = prev_line.strip() # previous line (0-indexed) + if self._indent_width(prev_line) < 4 and self._is_setext_paragraph_candidate(prev): + if _SETEXT_H1.match(stripped): + headers.append({ + "title": prev, + "level": 1, + "line_num": line_num - 1, + }) + continue + if _SETEXT_H2.match(stripped): + headers.append({ + "title": prev, + "level": 2, + "line_num": line_num - 1, + }) + continue + return headers - def _build_nodes(self, headers: list[dict], lines: list[str], model: str | None) -> list[ContentNode]: + @staticmethod + def _indent_width(line: str) -> int: + width = 0 + for char in line: + if char == " ": + width += 1 + elif char == "\t": + width += 4 - (width % 4) + else: + break + return width + + @staticmethod + def _is_closing_fence(line: str, fence_pattern: tuple[str, int] | None) -> bool: + if fence_pattern is None: + return False + + close_match = _FENCE_CLOSE.match(line) + if not close_match: + return False + + marker = close_match.group(1) + fence_char, fence_len = fence_pattern + return marker[0] == fence_char and len(marker) >= fence_len + + @staticmethod + def _is_setext_paragraph_candidate(line: str) -> bool: + return bool(line + and not _FENCE_OPEN.match(line) + and not _ATX_HEADER.match(line) + and not _BLOCKQUOTE.match(line) + and not _LIST_ITEM.match(line) + and not _TABLE_ROW.match(line)) + + def _build_nodes( + self, + headers: list[dict], + lines: list[str], + model: str | None, + line_offset: int = 0, + ) -> list[ContentNode]: + if not headers: + # No headers — entire content becomes a single node + text = "\n".join(lines).strip() + if not text: + return [] + tokens = count_tokens(text, model=model) + index = self._first_content_line(lines, line_offset) + return [ContentNode(content=text, tokens=tokens, index=index)] + nodes = [] + + # Content before the first header → preamble node + first_header_line = headers[0]["line_num"] + if first_header_line > 1: + preamble = "\n".join(lines[: first_header_line - 1]).strip() + if preamble: + tokens = count_tokens(preamble, model=model) + index = self._first_content_line( + lines[: first_header_line - 1], + line_offset, + ) + nodes.append(ContentNode(content=preamble, tokens=tokens, index=index)) + + # One node per header section for i, header in enumerate(headers): start = header["line_num"] - 1 end = headers[i + 1]["line_num"] - 1 if i + 1 < len(headers) else len(lines) text = "\n".join(lines[start:end]).strip() tokens = count_tokens(text, model=model) - nodes.append(ContentNode( - content=text, - tokens=tokens, - title=header["title"], - index=header["line_num"], - level=header["level"], - )) + nodes.append( + ContentNode( + content=text, + tokens=tokens, + title=header["title"], + index=header["line_num"] + line_offset, + level=header["level"], + ) + ) + return nodes + + @staticmethod + def _first_content_line(lines: list[str], line_offset: int) -> int: + for i, line in enumerate(lines, 1): + if line.strip(): + return i + line_offset + return line_offset + 1 diff --git a/tests/test_markdown_parser.py b/tests/test_markdown_parser.py index cbd06af99..e502047ce 100644 --- a/tests/test_markdown_parser.py +++ b/tests/test_markdown_parser.py @@ -53,3 +53,303 @@ def test_parse_nodes_have_index(sample_md): result = parser.parse(sample_md) for node in result.nodes: assert node.index is not None + + +# --- New tests for improved parser --- + +def test_preamble_before_first_header(tmp_path): + """Content before the first header should become a preamble node.""" + md = tmp_path / "preamble.md" + md.write_text("""This is a preamble paragraph. + +# First Header +Body text. +""") + parser = MarkdownParser() + result = parser.parse(str(md)) + assert len(result.nodes) == 2 + # Preamble node has no level/title + assert result.nodes[0].level is None + assert result.nodes[0].title is None + assert "preamble paragraph" in result.nodes[0].content + # Header node is normal + assert result.nodes[1].level == 1 + assert result.nodes[1].title == "First Header" + + +def test_headerless_file(tmp_path): + """A file with no headers should produce a single node.""" + md = tmp_path / "plain.md" + md.write_text("Just some plain text\nwith multiple lines.\n") + parser = MarkdownParser() + result = parser.parse(str(md)) + assert len(result.nodes) == 1 + assert result.nodes[0].level is None + assert "plain text" in result.nodes[0].content + + +def test_empty_file(tmp_path): + """An empty file should produce zero nodes.""" + md = tmp_path / "empty.md" + md.write_text("") + parser = MarkdownParser() + result = parser.parse(str(md)) + assert len(result.nodes) == 0 + + +def test_yaml_frontmatter_stripped(tmp_path): + """YAML frontmatter should be stripped and stored as metadata.""" + md = tmp_path / "front.md" + md.write_text("""--- +title: My Doc +author: Alice +--- + +# Introduction +Hello world. +""") + parser = MarkdownParser() + result = parser.parse(str(md)) + assert result.metadata is not None + assert "title: My Doc" in result.metadata["frontmatter"] + # Frontmatter should not appear in node content + for node in result.nodes: + assert "title: My Doc" not in node.content + + +def test_yaml_frontmatter_preserves_original_line_numbers(tmp_path): + """Node indexes should use original file line numbers after frontmatter.""" + md = tmp_path / "front_lines.md" + md.write_text("""--- +title: My Doc +--- + +# Introduction +Hello world. +""") + parser = MarkdownParser() + result = parser.parse(str(md)) + assert result.nodes[0].title == "Introduction" + assert result.nodes[0].index == 5 + + +def test_thematic_break_at_start_not_stripped_as_frontmatter(tmp_path): + """Markdown that starts with thematic breaks should not be stripped.""" + md = tmp_path / "thematic.md" + md.write_text("""--- + +# Introduction +Hello world. + +--- + +# Second +More text. +""") + parser = MarkdownParser() + result = parser.parse(str(md)) + assert result.metadata is None + assert result.nodes[0].level is None + assert result.nodes[0].content == "---" + assert result.nodes[1].title == "Introduction" + assert result.nodes[1].index == 3 + assert result.nodes[2].title == "Second" + + +def test_setext_h1(tmp_path): + """Setext-style H1 (=== underline) should be recognized.""" + md = tmp_path / "setext.md" + md.write_text("""Main Title +========== + +Some content here. + +Sub Title +--------- + +More content. +""") + parser = MarkdownParser() + result = parser.parse(str(md)) + assert len(result.nodes) == 2 + assert result.nodes[0].title == "Main Title" + assert result.nodes[0].level == 1 + assert result.nodes[1].title == "Sub Title" + assert result.nodes[1].level == 2 + + +@pytest.mark.parametrize( + ("underline", "level"), + [ + ("=", 1), + ("-", 2), + ], +) +def test_setext_single_character_underline(tmp_path, underline, level): + """Setext underlines may be a single marker character.""" + md = tmp_path / "setext_single.md" + md.write_text(f"""Title +{underline} + +Body text. +""") + parser = MarkdownParser() + result = parser.parse(str(md)) + assert len(result.nodes) == 1 + assert result.nodes[0].title == "Title" + assert result.nodes[0].level == level + + +@pytest.mark.parametrize("prefix", ["- item", "1. item", "> quote", "| A | B |"]) +def test_setext_requires_paragraph_previous_line(tmp_path, prefix): + """Setext underline should not turn list/quote/table lines into headers.""" + md = tmp_path / "not_setext.md" + md.write_text(f"""{prefix} +--- + +# Real Header +Content. +""") + parser = MarkdownParser() + result = parser.parse(str(md)) + titles = [n.title for n in result.nodes if n.title] + assert prefix not in titles + assert "Real Header" in titles + assert result.nodes[0].level is None + assert prefix in result.nodes[0].content + + +def test_headers_inside_code_blocks_ignored(tmp_path): + """Headers inside fenced code blocks should not be detected.""" + md = tmp_path / "code.md" + md.write_text("""# Real Header + +``` +# Not a header +## Also not a header +``` + +# Another Real Header +More text. +""") + parser = MarkdownParser() + result = parser.parse(str(md)) + titles = [n.title for n in result.nodes if n.title] + assert "Real Header" in titles + assert "Another Real Header" in titles + assert "Not a header" not in titles + assert "Also not a header" not in titles + + +def test_indented_code_block_headers_ignored(tmp_path): + """Headers inside four-space indented code blocks should not be detected.""" + md = tmp_path / "indented_code.md" + md.write_text("""# Before + + # Not a header + text + +# After +Done. +""") + parser = MarkdownParser() + result = parser.parse(str(md)) + titles = [n.title for n in result.nodes if n.title] + assert "Before" in titles + assert "After" in titles + assert "Not a header" not in titles + + +def test_three_space_indented_headers_allowed(tmp_path): + """Headers indented up to three spaces should still be recognized.""" + md = tmp_path / "indented_header.md" + md.write_text("""# Before + + # Still a header + +# After +Done. +""") + parser = MarkdownParser() + result = parser.parse(str(md)) + titles = [n.title for n in result.nodes if n.title] + assert "Before" in titles + assert "Still a header" in titles + assert "After" in titles + + +def test_tilde_code_fences(tmp_path): + """Tilde fences (~~~) should also be respected.""" + md = tmp_path / "tilde.md" + md.write_text("""# Header + +~~~ +# Fake header inside tilde fence +~~~ + +# Real Header After +Text. +""") + parser = MarkdownParser() + result = parser.parse(str(md)) + titles = [n.title for n in result.nodes if n.title] + assert "Header" in titles + assert "Real Header After" in titles + assert "Fake header inside tilde fence" not in titles + + +def test_long_fence_requires_matching_close(tmp_path): + """A ```` (4-backtick) opening fence should not close with ``` (3-backtick).""" + md = tmp_path / "longfence.md" + md.write_text("""# Before + +```` +# Inside fence +``` +# Still inside — 3 backticks can't close 4-backtick fence +``` +```` + +# After +Done. +""") + parser = MarkdownParser() + result = parser.parse(str(md)) + titles = [n.title for n in result.nodes if n.title] + assert "Before" in titles + assert "After" in titles + assert "Inside fence" not in titles + assert "Still inside" not in titles + + +def test_fence_close_requires_only_marker_and_spaces(tmp_path): + """Fence-like content with trailing text should not close the code block.""" + md = tmp_path / "fence_close.md" + md.write_text("""# Before + +``` +```not a closing fence +# Still code +``` + +# After +Done. +""") + parser = MarkdownParser() + result = parser.parse(str(md)) + titles = [n.title for n in result.nodes if n.title] + assert "Before" in titles + assert "After" in titles + assert "Still code" not in titles + + +def test_atx_closing_hashes(tmp_path): + """ATX headers with closing hashes like '## Title ##' should parse cleanly.""" + md = tmp_path / "closing.md" + md.write_text("""## Title ## +Content. +""") + parser = MarkdownParser() + result = parser.parse(str(md)) + assert result.nodes[0].title == "Title" + assert result.nodes[0].level == 2