git @ Cat's Eye Technologies Cleandown / master src / marko / source.py
master

Tree @master (Download .tar.gz)

source.py @masterraw · history · blame

# Copyright (c) 2019 Frost Ming
#
# SPDX-License-Identifier: LicenseRef-MIT-X-Marko

from __future__ import annotations

import functools
import re
import types
from contextlib import contextmanager
from typing import TYPE_CHECKING, Generator, Match, Pattern, cast, overload

from marko.block import BlockElement, Document

if TYPE_CHECKING:
    from typing import Literal

    from marko.parser import Parser


def _preprocess_text(text: str) -> str:
    return text.replace("\r\n", "\n")


class Source:
    """Wrapper class on content to be parsed"""

    parser: Parser

    def __init__(self, text: str) -> None:
        self._buffer = _preprocess_text(text)
        self.pos = 0
        self._anchor = 0
        self._states: list[BlockElement] = []
        self.match: Match[str] | None = None
        #: Store temporary data during parsing.
        self.context = types.SimpleNamespace()

    @property
    def state(self) -> BlockElement:
        """Returns the current element state."""
        if not self._states:
            raise RuntimeError("Need to push a state first.")
        return self._states[-1]

    @property
    def root(self) -> Document:
        """Returns the root element, which is at the bottom of self._states."""
        if not self._states:
            raise RuntimeError("Need to push a state first.")
        return cast(Document, self._states[0])

    def push_state(self, element: BlockElement) -> None:
        """Push a new state to the state stack."""
        self._states.append(element)

    def pop_state(self) -> BlockElement:
        """Pop the top most state."""
        return self._states.pop()

    @contextmanager
    def under_state(self, element: BlockElement) -> Generator[Source, None, None]:
        """A context manager to enable a new state temporarily."""
        self.push_state(element)
        yield self
        self.pop_state()

    @property
    def exhausted(self) -> bool:
        """Indicates whether the source reaches the end."""
        return self.pos >= len(self._buffer)

    @property
    def prefix(self) -> str:
        """The prefix of each line when parsing."""
        return "".join(s._prefix for s in self._states)

    def _expect_re(self, regexp: Pattern[str] | str, pos: int) -> Match[str] | None:
        if isinstance(regexp, str):
            regexp = re.compile(regexp)
        return regexp.match(self._buffer, pos)

    @staticmethod
    @functools.lru_cache
    def match_prefix(prefix: str, line: str) -> int:
        """Check if the line starts with given prefix and
        return the position of the end of prefix.
        If the prefix is not matched, return -1.
        """
        m = re.match(prefix, line.expandtabs(4))
        if not m:
            if re.match(prefix, line.expandtabs(4).replace("\n", " " * 99 + "\n")):
                return len(line) - 1
            return -1
        pos = m.end()
        if pos == 0:
            return 0
        for i in range(1, len(line) + 1):
            if len(line[:i].expandtabs(4)) >= pos:
                return i
        return -1  # pragma: no cover

    def expect_re(self, regexp: Pattern[str] | str) -> Match[str] | None:
        """Test against the given regular expression and returns the match object.
        :param regexp: the expression to be tested.
        :returns: the match object.
        """
        prefix_len = self.match_prefix(
            self.prefix, self.next_line(require_prefix=False)  # type: ignore
        )
        if prefix_len >= 0:
            match = self._expect_re(regexp, self.pos + prefix_len)
            self.match = match
            return match
        else:
            return None

    @overload
    def next_line(self, require_prefix: Literal[False] = ...) -> str: ...

    @overload
    def next_line(self, require_prefix: Literal[True] = ...) -> str | None: ...

    def next_line(self, require_prefix: bool = True) -> str | None:
        """Return the next line in the source.

        :param require_prefix:  if False, the whole line will be returned.
            otherwise, return the line with prefix stripped or None if the prefix
            is not matched.
        """
        if require_prefix:
            m = self.expect_re(r"(?m)[^\n]*?$\n?")
        else:
            m = self._expect_re(r"(?m)[^\n]*$\n?", self.pos)
        self.match = m
        if m:
            return m.group()
        return None

    def consume(self) -> None:
        """Consume the body of source. ``pos`` will move forward."""
        if self.match:
            self.pos = self.match.end()
            if self.match.group()[-1:] == "\n":
                self._update_prefix()
            self.match = None

    def anchor(self) -> None:
        """Pin the current parsing position."""
        self._anchor = self.pos

    def reset(self) -> None:
        """Reset the position to the last anchor."""
        self.pos = self._anchor

    def _update_prefix(self) -> None:
        for s in self._states:
            if hasattr(s, "_second_prefix"):
                s._prefix = s._second_prefix  # type: ignore