Source code for danoan.perchance_tools.core.api

from danoan.perchance_tools.core import exception, model, utils
from danoan.llm_assistant.core import api as llm_assistant

from dataclasses import dataclass
from importlib import resources
from jinja2 import Template
import json
import logging
from pathlib import Path
import sys
import re
from typing import Any, Dict, Generator, List, TextIO, Tuple

LOG_LEVEL = logging.DEBUG

logger = logging.getLogger(__file__)
logger.setLevel(LOG_LEVEL)
handler = logging.StreamHandler(sys.stderr)
handler.setLevel(LOG_LEVEL)
handler.setFormatter(logging.Formatter("%(levelname)s:%(name)s:%(message)s"))
logger.addHandler(handler)

# -------------------- Markdown to YML --------------------


@dataclass
class _MarkdownMarker:
    level: int
    title: str
    character_span: Tuple[int, int]


@dataclass
class _MarkdownCue:
    level: int
    title: str
    lines: List[str]


def _collect_markdown_markers(
    markdown_text: str,
) -> Generator[_MarkdownMarker, None, None]:
    """
    Return the start and end characters of header titles in a markdown.

    >>> markdown_text = '''
    ... # Running journal
    ... This document register my running trainings.
    ... ## 1st April 2024
    ... Ran 5km in 30 minutes.
    ... '''
    >>> markers = list(_collect_markdown_markers(markdown_text))
    >>> m0 = markers[0]
    >>> m1 = markers[1]
    >>> assert( (m0.level,m0.title,m0.character_span) == (1,'Running journal', (1,18)))
    >>> assert( (m1.level,m1.title,m1.character_span) == (2,'1st April 2024', (64,81)))
    """
    header_start = re.compile(r"^[ ]*(#+)\s*([^\n]*)", re.MULTILINE)

    for m in header_start.finditer(markdown_text):
        header_marker, header_title = m.groups()
        header_level = len(header_marker)
        yield _MarkdownMarker(header_level, header_title, m.span())


def _parse_markdown(markdown_text: str) -> Generator[_MarkdownCue, None, None]:
    """
    Parse markdown text into (level, title, list of words) components.
    """
    m = list(_collect_markdown_markers(markdown_text))
    m.append(_MarkdownMarker(0, "root", (len(markdown_text), len(markdown_text))))

    for e, n in zip(m[:-1], m[1:]):
        level, title, span = e.level, e.title, e.character_span
        next_span = n.character_span

        start, end = span
        next_start, _ = next_span

        words = set()
        for w in markdown_text[end:next_start].split("\n"):
            _w = w.strip()
            if len(_w) == 0:
                continue
            words.add(_w)

        yield _MarkdownCue(level, title, list(sorted(words)))


[docs] def create_dict_from_markdown(markdown_stream: TextIO) -> model.WordDict: """ Create a dictionary from markdown text. Header titles are mapped to keys which values are dictionaries themselves created from theirs sub-headers. The text level is stored in the key `words` within its closest header-level dictionary. The `words` key stores the lines of the text. """ cues = [_MarkdownCue(0, "root", [])] cues.extend(list(_parse_markdown(markdown_stream.read()))) visited = [False] * len(cues) def _create_tree(start_index: int): mc = cues[start_index] if visited[start_index]: return None visited[start_index] = True root: Dict[str, Any] = {mc.title.strip(): {}} if len(mc.lines) > 0: root[mc.title.strip()] = {"words": mc.lines} return root for index in range(start_index + 1, len(cues)): c = cues[index] if c.level > mc.level: t = _create_tree(index) if t: root[mc.title.strip()].update(t) elif c.level <= mc.level: return root return root return model.WordDict(_create_tree(0))
# -------------------- Correct Words -------------------- def _get_asset(asset_relative_path: Path): ASSETS_PACKAGE = "danoan.perchance_tools.assets" s = [] t = asset_relative_path while t != t.parent: s.append(t.name) t = t.parent s.reverse() asset_dot_path = ".".join(s[:-1]) resource_dot_path = f"{ASSETS_PACKAGE}.{asset_dot_path}" return resources.path(resource_dot_path, s[-1]) def _read_asset_as_text(asset_relative_path: Path): asset_path = _get_asset(asset_relative_path) with open(asset_path, "r") as f: return f.read() def _setup_llm_assistant(): instance = llm_assistant.LLMAssistant() if not instance.config: config = llm_assistant.get_configuration() if not config.use_cache: raise exception.CacheNotConfiguredError() instance.setup(config) def _render_correct_words_user_prompt(categories: List[str], words: List[str]): template_data = _read_asset_as_text( Path("prompts") / "correct_words" / "user.txt.tpl" ) template = Template(template_data) return template.render(categories=categories, words=words) def _call_correct_word_prompt(user_prompt: str, language, model: str): system_prompt = _read_asset_as_text( Path("prompts") / "correct_words" / "system.txt.tpl" ) full_examples = _read_asset_as_text( Path("prompts") / "correct_words" / language.alpha_3 / "full-examples.txt" ) data = { "language": language.name, "full_examples": full_examples, } prompt = llm_assistant.model.PromptConfiguration( "correct-words", system_prompt, user_prompt ) _setup_llm_assistant() result = llm_assistant.custom(prompt, model=model, **data) return result.content def _ensure_json_list_string(text_response: str): response_lines = text_response.splitlines() first_line = 0 for i, line in enumerate(response_lines): if line and line[0] == "[": first_line = i break return "".join(response_lines[first_line:]) def _find_corrections(word_dict: model.WordDict, language, model: str): for key_path in utils.collect_key_path(word_dict, "words"): categories = key_path["path"] render_categories = [x for x in categories] render_categories.remove("root") words = key_path["words"] user_prompt = _render_correct_words_user_prompt(render_categories, words) r = _call_correct_word_prompt(user_prompt, language, model) r = _ensure_json_list_string(r) try: correction = {} correction["key"] = categories correction["replace_pairs"] = json.loads(r) if correction["replace_pairs"] and len(correction["replace_pairs"]) > 0: logger.debug(categories) logger.debug(correction["replace_pairs"]) except json.JSONDecodeError as ex: logger.debug("Error decoding LLM response as json") logger.debug(categories) logger.debug(r) logger.debug(ex) # If an error is found while generating the JSON, I assume the list of # corrections is empty correction["replace_pairs"] = [] yield correction
[docs] def replace_words( word_dict: model.WordDict, correction_instructions: List[model.ReplaceInstructions], ): """ Executes a series of replace operations in a WordDict. The ReplaceInstruction has a key and a list of replace pairs with old and new word. """ for instruction in correction_instructions: u = word_dict for category in instruction.key: u = u[category] set_of_words = set(u["words"].extract()) for correction_pair in instruction.replace_pairs: original, correction = correction_pair set_of_words.remove(original) set_of_words.add(correction) u["words"] = list(sorted(set_of_words)) return word_dict
[docs] def find_corrections( word_dict: model.WordDict, language ) -> List[model.ReplaceInstructions]: """ Find typos and mispelled words in the dictionary. Runs a prompt over a LLM to find mispelled words in the WordDict and return a list of ReplaceInstructions. """ return [ model.ReplaceInstructions(**x) for x in _find_corrections(word_dict, language, "gpt-4o") ]
# -------------------- Perchance format --------------------
[docs] def translate(word: str, from_language: str, to_language: str) -> List[str]: """ Translate a word from one language to another. Runs a prompt over a LLM to get the translation. """ system_prompt_template = _read_asset_as_text( Path("prompts") / "translate" / "system.txt.tpl" ) user_prompt_template = _read_asset_as_text( Path("prompts") / "translate" / "user.txt.tpl" ) data = { "word": word, "from_language_name": from_language, "to_language_name": to_language, } system_prompt = Template(system_prompt_template).render(**data) user_prompt = Template(user_prompt_template).render(**data) prompt = llm_assistant.model.PromptConfiguration( "translate-word", system_prompt, user_prompt ) _setup_llm_assistant() result = llm_assistant.custom(prompt, model="gpt-3.5-turbo") response = result.content if not response: return [] else: try: response_data = json.loads(response) if type(response_data) is list: return response_data else: logger.debug("Expected a list") raise TypeError() except json.JSONDecodeError: logger.debug("Error decoding LLM response as json") logger.debug(word) return [word]