Skip to content

BashToolkit

https://github.com/pexpect/pexpect @ii-agent/src/ii_agent/tools/bash_tool.py

--- https://www.anthropic.com/engineering/swe-bench-sonnet --- Run commands in a bash shell

  • When invoking this tool, the contents of the "command" parameter does NOT need to be XML-escaped.

  • You don't have access to the internet via this tool.

  • You do have access to a mirror of common linux and python packages via apt and pip.

  • State is persistent across command calls and discussions with the user.

  • To inspect a particular line range of a file, e.g. lines 10-25, try 'sed -n 10,25p /path/to/the/file'.

  • Please avoid commands that may produce a very large amount of output.

  • Please run long lived commands in the background, e.g. 'sleep 10 &' or start a server in the background."

BashToolkit

Bases: AsyncBaseToolkit

Source code in utu/tools/bash_toolkit.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
class BashToolkit(AsyncBaseToolkit):
    def __init__(self, config: ToolkitConfig = None) -> None:
        super().__init__(config)
        # self.require_confirmation = self.config.config.get("require_confirmation", False)
        # self.command_filters = self.config.config.get("command_filters", [])
        self.timeout = self.config.config.get("timeout", 60)
        self.banned_command_strs = [
            "git init",
            "git commit",
            "git add",
        ]

        self.child, self.custom_prompt = self.start_persistent_shell(timeout=self.timeout)

        workspace_root = self.config.config.get("workspace_root", "/tmp/")
        self.setup_workspace(workspace_root)

    def setup_workspace(self, workspace_root: str):
        workspace_dir = pathlib.Path(workspace_root)
        workspace_dir.mkdir(parents=True, exist_ok=True)
        self.workspace_root = workspace_root
        self.run_command(self.child, self.custom_prompt, f"cd {workspace_root}")

    @staticmethod
    def start_persistent_shell(timeout: int):
        # https://github.com/pexpect/pexpect/issues/321

        # Start a new Bash shell
        if sys.platform == "win32":
            child = pexpect.spawn("cmd.exe", encoding="utf-8", echo=False, timeout=timeout)
            custom_prompt = "PROMPT_>"
            child.sendline(f"prompt {custom_prompt}")
            child.expect(custom_prompt)
        else:
            child = pexpect.spawn("/bin/bash", encoding="utf-8", echo=False, timeout=timeout)
            # Set a known, unique prompt
            # We use a random string that is unlikely to appear otherwise
            # so we can detect the prompt reliably.
            custom_prompt = "PEXPECT_PROMPT>> "
            child.sendline("stty -onlcr")
            child.sendline("unset PROMPT_COMMAND")
            child.sendline(f"PS1='{custom_prompt}'")
            # Force an initial read until the newly set prompt shows up
            child.expect(custom_prompt)
            return child, custom_prompt

    @staticmethod
    def run_command(child, custom_prompt: str, cmd: str) -> str:
        # Send the command
        child.sendline(cmd)
        # Wait until we see the prompt again
        child.expect(custom_prompt)
        # Output is everything printed before the prompt minus the command itself
        # pexpect puts the matched prompt in child.after and everything before it in child.before.

        raw_output = child.before.strip()
        ansi_escape = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]")
        clean_output = ansi_escape.sub("", raw_output)

        if clean_output.startswith("\r"):
            clean_output = clean_output[1:]

        return clean_output

    @register_tool
    async def run_bash(self, command: str) -> str:
        """Execute a bash command in your workspace and return its output.

        Args:
            command: The command to execute
        """
        # 1) filter: change command before execution. E.g. used in SSH or Docker.
        # original_command = command
        # command = self.apply_filters(original_command)
        # if command != original_command:
        #     logger.info(f"Command filtered: {original_command} -> {command}")

        # 2) banned command check
        for banned_str in self.banned_command_strs:
            if banned_str in command:
                return f"Command not executed due to banned string in command: {banned_str} found in {command}."

        # if self.require_confirmation:
        #     ...

        # confirm no bad stuff happened
        try:
            echo_result = self.run_command(self.child, self.custom_prompt, "echo hello")
            assert echo_result.strip() == "hello"
        except Exception:  # pylint: disable=broad-except
            self.child, self.custom_prompt = self.start_persistent_shell(self.timeout)

        # 3) Execute the command and capture output
        try:
            result = self.run_command(self.child, self.custom_prompt, command)
            return str(
                {
                    "command output": result,
                }
            )
        except Exception as e:  # pylint: disable=broad-except
            return str(
                {
                    "error": str(e),
                }
            )

tools_map property

tools_map: dict[str, Callable]

Lazy loading of tools map. - collect tools registered by @register_tool

run_bash async

run_bash(command: str) -> str

Execute a bash command in your workspace and return its output.

Parameters:

Name Type Description Default
command str

The command to execute

required
Source code in utu/tools/bash_toolkit.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
@register_tool
async def run_bash(self, command: str) -> str:
    """Execute a bash command in your workspace and return its output.

    Args:
        command: The command to execute
    """
    # 1) filter: change command before execution. E.g. used in SSH or Docker.
    # original_command = command
    # command = self.apply_filters(original_command)
    # if command != original_command:
    #     logger.info(f"Command filtered: {original_command} -> {command}")

    # 2) banned command check
    for banned_str in self.banned_command_strs:
        if banned_str in command:
            return f"Command not executed due to banned string in command: {banned_str} found in {command}."

    # if self.require_confirmation:
    #     ...

    # confirm no bad stuff happened
    try:
        echo_result = self.run_command(self.child, self.custom_prompt, "echo hello")
        assert echo_result.strip() == "hello"
    except Exception:  # pylint: disable=broad-except
        self.child, self.custom_prompt = self.start_persistent_shell(self.timeout)

    # 3) Execute the command and capture output
    try:
        result = self.run_command(self.child, self.custom_prompt, command)
        return str(
            {
                "command output": result,
            }
        )
    except Exception as e:  # pylint: disable=broad-except
        return str(
            {
                "error": str(e),
            }
        )

get_tools_map_func

get_tools_map_func() -> dict[str, Callable]

Get tools map. It will filter tools by config.activated_tools if it is not None.

Source code in utu/tools/base.py
36
37
38
39
40
41
42
43
44
45
def get_tools_map_func(self) -> dict[str, Callable]:
    """Get tools map. It will filter tools by config.activated_tools if it is not None."""
    if self.config.activated_tools:
        assert all(tool_name in self.tools_map for tool_name in self.config.activated_tools), (
            f"Error config activated tools: {self.config.activated_tools}! available tools: {self.tools_map.keys()}"
        )
        tools_map = {tool_name: self.tools_map[tool_name] for tool_name in self.config.activated_tools}
    else:
        tools_map = self.tools_map
    return tools_map

get_tools_in_agents

get_tools_in_agents() -> list[FunctionTool]

Get tools in openai-agents format.

Source code in utu/tools/base.py
47
48
49
50
51
52
53
54
55
56
57
58
def get_tools_in_agents(self) -> list[FunctionTool]:
    """Get tools in openai-agents format."""
    tools_map = self.get_tools_map_func()
    tools = []
    for _, tool in tools_map.items():
        tools.append(
            function_tool(
                tool,
                strict_mode=False,  # turn off strict mode
            )
        )
    return tools

get_tools_in_openai

get_tools_in_openai() -> list[dict]

Get tools in OpenAI format.

Source code in utu/tools/base.py
60
61
62
63
def get_tools_in_openai(self) -> list[dict]:
    """Get tools in OpenAI format."""
    tools = self.get_tools_in_agents()
    return [ChatCompletionConverter.tool_to_openai(tool) for tool in tools]

get_tools_in_mcp

get_tools_in_mcp() -> list[Tool]

Get tools in MCP format.

Source code in utu/tools/base.py
65
66
67
68
def get_tools_in_mcp(self) -> list[types.Tool]:
    """Get tools in MCP format."""
    tools = self.get_tools_in_agents()
    return [MCPConverter.function_tool_to_mcp(tool) for tool in tools]

call_tool async

call_tool(name: str, arguments: dict) -> str

Call a tool by its name.

Source code in utu/tools/base.py
70
71
72
73
74
75
76
async def call_tool(self, name: str, arguments: dict) -> str:
    """Call a tool by its name."""
    tools_map = self.get_tools_map_func()
    if name not in tools_map:
        raise ValueError(f"Tool {name} not found")
    tool = tools_map[name]
    return await tool(**arguments)