Skip to content

DocumentToolkit

Document toolkit for parsing documents and support Q&A.

Support backends:

DocumentToolkit

Bases: AsyncBaseToolkit

Source code in utu/tools/document_toolkit.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
class DocumentToolkit(AsyncBaseToolkit):
    def __init__(self, config: ToolkitConfig = None) -> None:
        """Initialize the DocumentToolkit, with configed parser and llm."""
        super().__init__(config)
        if self.config.config.get("parser") == "chunkr":
            from .documents.chunkr_parser import ChunkrParser

            self.parser = ChunkrParser(self.config.config)
        elif self.config.config.get("parser") == "pymupdf":
            from .documents.pdf_parser import PDFParser

            self.parser = PDFParser(self.config.config)
        elif self.config.config.get("parser") == "unstructured":
            from .documents.unstructured_parser import UnstructuredParser

            self.parser = UnstructuredParser(self.config.config)
        else:
            raise ValueError(f"Unsupported parser: {self.config.config.get('parser')}")
        self.text_limit = self.config.config.get("text_limit", 100_000)
        self.llm = SimplifiedAsyncOpenAI(**self.config.config_llm.model_provider.model_dump())
        self.md5_to_path = {}

    async def parse_document(self, md5: str) -> str:
        logger.info(f"[tool] parse_document: {self.md5_to_path[md5]}")
        return await self.parser.parse(self.md5_to_path[md5])

    def handle_path(self, path_or_url: str) -> str:
        md5 = FileUtils.get_file_md5(path_or_url)
        logger.info(f"md5 for {path_or_url}: {md5}")
        if FileUtils.is_web_url(path_or_url):
            # download document to data/_document, with md5
            fn = CACHE_DIR / "documents" / f"{md5}{FileUtils.get_file_ext(path_or_url)}"
            fn.parent.mkdir(parents=True, exist_ok=True)
            if not fn.exists():
                logger.info(f"Downloaded document file to {path_or_url}")
                FileUtils.download_file(url=path_or_url, save_path=fn)
            self.md5_to_path[md5] = fn  # record md5 to map
        else:
            self.md5_to_path[md5] = path_or_url
        return md5

    @register_tool
    async def document_parse(self, document_path: str, chunk_size: int = None, chunk_id: int = None) -> str:
        """Parse document and return the processed text.
        - Supported file types: pdf, docx, pptx, xlsx, xls, ppt, doc
        - If the document is too large, it will be truncated to the first chunk_size characters.
        - If pass chunk_id, it will return the chunk text begin with chunk_id * chunk_size.

        Args:
            document_path (str): Local path or URL to a document.
            chunk_size (int, optional): Number of characters to process at once. Defaults to 10_000.
            chunk_id (int, optional): Chunk ID to start from. Defaults to 0.
        """
        md5 = self.handle_path(document_path)
        document_markdown = await self.parse_document(md5)

        meta = {
            "path": self.md5_to_path[md5],
            "total_chars": len(document_markdown),
        }
        chunk_size = chunk_size or 10_000
        chunk_id = chunk_id or 0
        if meta["total_chars"] > chunk_size:
            meta["is_chunked"] = True
            meta["chunk_size"] = chunk_size
            meta["chunk_total"] = (meta["total_chars"] + chunk_size - 1) // chunk_size
            meta["chunk_id"] = chunk_id
            meta["content"] = document_markdown[chunk_id * chunk_size : (chunk_id + 1) * chunk_size]
        else:
            meta["is_chunked"] = False
            meta["content"] = document_markdown
        return json.dumps(meta, ensure_ascii=False)

    @register_tool
    async def document_qa(self, document_path: str, question: str | None = None) -> str:
        """Get file content summary or answer questions about attached document.

        Supported file types: pdf, docx, pptx, xlsx, xls, ppt, doc

        Args:
            document_path (str): Local path or URL to a document.
            question (str, optional): The question to answer. If not provided, return a summary of the document.
        """
        md5 = self.handle_path(document_path)
        document_markdown = await self.parse_document(md5)
        if len(document_markdown) > self.text_limit:
            document_markdown = document_markdown[: self.text_limit] + "\n..."
        messages = [
            {"role": "system", "content": TOOL_PROMPTS["document_sp"]},
            {"role": "user", "content": document_markdown},
        ]
        if question:
            messages.append({"role": "user", "content": TOOL_PROMPTS["document_qa"].format(question=question)})
        else:
            messages.append({"role": "user", "content": TOOL_PROMPTS["document_summary"]})
        output = await self.llm.query_one(messages=messages, **self.config.config_llm.model_params.model_dump())
        if not question:
            output = (
                f"You did not provide a particular question, so here is a detailed caption for the document: {output}"
            )
        return output

tools_map property

tools_map: dict[str, Callable]

Lazy loading of tools map. - collect tools registered by @register_tool

__init__

__init__(config: ToolkitConfig = None) -> None

Initialize the DocumentToolkit, with configed parser and llm.

Source code in utu/tools/document_toolkit.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def __init__(self, config: ToolkitConfig = None) -> None:
    """Initialize the DocumentToolkit, with configed parser and llm."""
    super().__init__(config)
    if self.config.config.get("parser") == "chunkr":
        from .documents.chunkr_parser import ChunkrParser

        self.parser = ChunkrParser(self.config.config)
    elif self.config.config.get("parser") == "pymupdf":
        from .documents.pdf_parser import PDFParser

        self.parser = PDFParser(self.config.config)
    elif self.config.config.get("parser") == "unstructured":
        from .documents.unstructured_parser import UnstructuredParser

        self.parser = UnstructuredParser(self.config.config)
    else:
        raise ValueError(f"Unsupported parser: {self.config.config.get('parser')}")
    self.text_limit = self.config.config.get("text_limit", 100_000)
    self.llm = SimplifiedAsyncOpenAI(**self.config.config_llm.model_provider.model_dump())
    self.md5_to_path = {}

document_parse async

document_parse(
    document_path: str,
    chunk_size: int = None,
    chunk_id: int = None,
) -> str

Parse document and return the processed text. - Supported file types: pdf, docx, pptx, xlsx, xls, ppt, doc - If the document is too large, it will be truncated to the first chunk_size characters. - If pass chunk_id, it will return the chunk text begin with chunk_id * chunk_size.

Parameters:

Name Type Description Default
document_path str

Local path or URL to a document.

required
chunk_size int

Number of characters to process at once. Defaults to 10_000.

None
chunk_id int

Chunk ID to start from. Defaults to 0.

None
Source code in utu/tools/document_toolkit.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@register_tool
async def document_parse(self, document_path: str, chunk_size: int = None, chunk_id: int = None) -> str:
    """Parse document and return the processed text.
    - Supported file types: pdf, docx, pptx, xlsx, xls, ppt, doc
    - If the document is too large, it will be truncated to the first chunk_size characters.
    - If pass chunk_id, it will return the chunk text begin with chunk_id * chunk_size.

    Args:
        document_path (str): Local path or URL to a document.
        chunk_size (int, optional): Number of characters to process at once. Defaults to 10_000.
        chunk_id (int, optional): Chunk ID to start from. Defaults to 0.
    """
    md5 = self.handle_path(document_path)
    document_markdown = await self.parse_document(md5)

    meta = {
        "path": self.md5_to_path[md5],
        "total_chars": len(document_markdown),
    }
    chunk_size = chunk_size or 10_000
    chunk_id = chunk_id or 0
    if meta["total_chars"] > chunk_size:
        meta["is_chunked"] = True
        meta["chunk_size"] = chunk_size
        meta["chunk_total"] = (meta["total_chars"] + chunk_size - 1) // chunk_size
        meta["chunk_id"] = chunk_id
        meta["content"] = document_markdown[chunk_id * chunk_size : (chunk_id + 1) * chunk_size]
    else:
        meta["is_chunked"] = False
        meta["content"] = document_markdown
    return json.dumps(meta, ensure_ascii=False)

document_qa async

document_qa(
    document_path: str, question: str | None = None
) -> str

Get file content summary or answer questions about attached document.

Supported file types: pdf, docx, pptx, xlsx, xls, ppt, doc

Parameters:

Name Type Description Default
document_path str

Local path or URL to a document.

required
question str

The question to answer. If not provided, return a summary of the document.

None
Source code in utu/tools/document_toolkit.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
@register_tool
async def document_qa(self, document_path: str, question: str | None = None) -> str:
    """Get file content summary or answer questions about attached document.

    Supported file types: pdf, docx, pptx, xlsx, xls, ppt, doc

    Args:
        document_path (str): Local path or URL to a document.
        question (str, optional): The question to answer. If not provided, return a summary of the document.
    """
    md5 = self.handle_path(document_path)
    document_markdown = await self.parse_document(md5)
    if len(document_markdown) > self.text_limit:
        document_markdown = document_markdown[: self.text_limit] + "\n..."
    messages = [
        {"role": "system", "content": TOOL_PROMPTS["document_sp"]},
        {"role": "user", "content": document_markdown},
    ]
    if question:
        messages.append({"role": "user", "content": TOOL_PROMPTS["document_qa"].format(question=question)})
    else:
        messages.append({"role": "user", "content": TOOL_PROMPTS["document_summary"]})
    output = await self.llm.query_one(messages=messages, **self.config.config_llm.model_params.model_dump())
    if not question:
        output = (
            f"You did not provide a particular question, so here is a detailed caption for the document: {output}"
        )
    return output

setup_env

setup_env(env: _BaseEnv) -> None

Setup env and workspace.

Source code in utu/tools/base.py
35
36
37
38
39
40
def setup_env(self, env: "_BaseEnv") -> None:
    """Setup env and workspace."""
    self.env = env
    if self.env_mode == "e2b":  # assert is E2BEnv
        self.e2b_sandbox = env.sandbox
    self.setup_workspace()

setup_workspace

setup_workspace(workspace_root: str = None)

Setup workspace. Implemented inside specific toolkits.

Source code in utu/tools/base.py
42
43
44
def setup_workspace(self, workspace_root: str = None):
    """Setup workspace. Implemented inside specific toolkits."""
    pass

build async

build() -> None

Build/initialize the toolkit. Override in subclasses that need async initialization.

Source code in utu/tools/base.py
46
47
48
async def build(self) -> None:
    """Build/initialize the toolkit. Override in subclasses that need async initialization."""
    pass

cleanup async

cleanup() -> None

Cleanup toolkit resources. Override in subclasses that need cleanup.

Source code in utu/tools/base.py
50
51
52
async def cleanup(self) -> None:
    """Cleanup toolkit resources. Override in subclasses that need cleanup."""
    pass

get_tools_map_func

get_tools_map_func() -> dict[str, Callable]

Get tools map. It will filter tools by config.activated_tools if it is not None.

Source code in utu/tools/base.py
68
69
70
71
72
73
74
75
76
77
def get_tools_map_func(self) -> dict[str, Callable]:
    """Get tools map. It will filter tools by config.activated_tools if it is not None."""
    if self.config.activated_tools:
        assert all(tool_name in self.tools_map for tool_name in self.config.activated_tools), (
            f"Error config activated tools: {self.config.activated_tools}! available tools: {self.tools_map.keys()}"
        )
        tools_map = {tool_name: self.tools_map[tool_name] for tool_name in self.config.activated_tools}
    else:
        tools_map = self.tools_map
    return tools_map

get_tools_in_agents

get_tools_in_agents() -> list[FunctionTool]

Get tools in openai-agents format.

Source code in utu/tools/base.py
79
80
81
82
83
84
85
86
87
88
89
90
def get_tools_in_agents(self) -> list[FunctionTool]:
    """Get tools in openai-agents format."""
    tools_map = self.get_tools_map_func()
    tools = []
    for _, tool in tools_map.items():
        tools.append(
            function_tool(
                tool,
                strict_mode=False,  # turn off strict mode
            )
        )
    return tools

get_tools_in_openai

get_tools_in_openai() -> list[dict]

Get tools in OpenAI format.

Source code in utu/tools/base.py
92
93
94
95
def get_tools_in_openai(self) -> list[dict]:
    """Get tools in OpenAI format."""
    tools = self.get_tools_in_agents()
    return [ChatCompletionConverter.tool_to_openai(tool) for tool in tools]

get_tools_in_mcp

get_tools_in_mcp() -> list[Tool]

Get tools in MCP format.

Source code in utu/tools/base.py
 97
 98
 99
100
def get_tools_in_mcp(self) -> list[types.Tool]:
    """Get tools in MCP format."""
    tools = self.get_tools_in_agents()
    return [MCPConverter.function_tool_to_mcp(tool) for tool in tools]

call_tool async

call_tool(name: str, arguments: dict) -> str

Call a tool by its name.

Source code in utu/tools/base.py
102
103
104
105
106
107
108
async def call_tool(self, name: str, arguments: dict) -> str:
    """Call a tool by its name."""
    tools_map = self.get_tools_map_func()
    if name not in tools_map:
        raise ValueError(f"Tool {name} not found")
    tool = tools_map[name]
    return await tool(**arguments)