Skip to content

TabularDataToolkit

  • [ ] enhance expressiveness of the returned file structure.

TabularDataToolkit

Bases: AsyncBaseToolkit

Source code in utu/tools/tabular_data_toolkit.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
class TabularDataToolkit(AsyncBaseToolkit):
    def __init__(self, config: ToolkitConfig = None):
        super().__init__(config)
        self.llm = SimplifiedAsyncOpenAI(
            **self.config.config_llm.model_provider.model_dump() if self.config.config_llm else {}
        )

    def get_tabular_columns(self, file_path: str, return_feat: list[str] = None) -> str:
        logger.info(f"[tool] get_tabular_columns: {file_path}")
        if not os.path.exists(file_path):
            return self._stringify_column_info([{"error": f"File '{file_path}' does not exist."}])

        try:
            # 1. Load the tabular data using the helper function
            df = self._load_tabular_data(file_path)
            # 2. Build column information
            column_info = []
            for col in df.columns:
                try:
                    # Get data type
                    dtype = str(df[col].dtype)

                    # Get a non-null sample value
                    sample_value = None
                    non_null_values = df[col].dropna()
                    if len(non_null_values) > 0:
                        # Get the first non-null value as sample
                        sample_value = non_null_values.iloc[0]
                        # Convert to string, handling different data types
                        if pd.isna(sample_value):
                            sample_str = "NaN"
                        elif isinstance(sample_value, float):
                            if math.isnan(sample_value):
                                sample_str = "NaN"
                            else:
                                sample_str = str(sample_value)
                        else:
                            sample_str = str(sample_value)
                    else:
                        sample_str = "No data"

                    column_info.append({"column_name": str(col), "type": dtype, "sample": sample_str})

                except Exception as e:  # pylint: disable=broad-except
                    logger.warning(f"Error processing column '{col}': {e}")
                    column_info.append({"column_name": str(col), "type": "unknown", "sample": "Error reading sample"})

            return self._stringify_column_info(column_info, return_feat=return_feat)

        except Exception as e:  # pylint: disable=broad-except
            error_msg = f"Error reading file '{file_path}': {str(e)}"
            logger.error(error_msg)
            return self._stringify_column_info([{"error": error_msg}], return_feat=return_feat)

    @register_tool
    async def get_column_info(self, file_path: str) -> str:
        """Get basic column information from a tabular data file (e.g. csv, xlsx).

        Args:
            file_path (str): Path to the tabular data file.

        Returns:
            str: Basic column information including column name, type, and sample value.
        """
        column_info_str = self.get_tabular_columns(file_path)
        prompt = TOOL_PROMPTS["tabular_column_info"].format(column_info=column_info_str)
        logger.info(f"[tool] get_column_info: {file_path}")

        response = await self.llm.query_one(
            messages=[{"role": "user", "content": prompt}],
            # **self.config.config_llm.model_params.model_dump()
        )
        return response

    def _load_tabular_data(self, file_path: str) -> pd.DataFrame:
        # Get file extension to determine how to read the file
        file_ext = pathlib.Path(file_path).suffix.lower()

        # Read the file based on its extension
        if file_ext == ".csv":
            # Try different encodings for CSV files
            encodings = ["utf-8", "latin1", "cp1252", "iso-8859-1"]
            df = None
            for encoding in encodings:
                try:
                    df = pd.read_csv(file_path, encoding=encoding)
                    break
                except UnicodeDecodeError:
                    continue
            if df is None:
                raise Exception("Could not read CSV file with any supported encoding")
        elif file_ext in [".xlsx", ".xls"]:
            df = pd.read_excel(file_path)
        elif file_ext == ".json":
            # Try to read JSON as tabular data
            df = pd.read_json(file_path)
        elif file_ext == ".parquet":
            df = pd.read_parquet(file_path)
        elif file_ext == ".tsv":
            # Tab-separated values
            encodings = ["utf-8", "latin1", "cp1252", "iso-8859-1"]
            df = None
            for encoding in encodings:
                try:
                    df = pd.read_csv(file_path, sep="\t", encoding=encoding)
                    break
                except UnicodeDecodeError:
                    continue
            if df is None:
                raise Exception("Could not read TSV file with any supported encoding")
        else:
            # Try to read as CSV by default
            try:
                df = pd.read_csv(file_path)
            except Exception as e:  # pylint: disable=broad-except
                raise Exception(f"Unsupported file format: {file_ext}") from e

        return df

    def _stringify_column_info(self, column_info: list[dict], return_feat: list[str] = None) -> str:
        """Convert column information to a formatted string."""
        if "error" in column_info[0]:
            return column_info[0]["error"]

        lines = []
        return_keys = ["column_name", "type", "sample"]
        if return_feat:
            return_keys = [key for key in return_keys if key in return_feat]
        for i, col in enumerate(column_info):
            lines.append(
                f"- Column {i + 1}: {json.dumps({k: col[k] for k in return_keys if k in col}, ensure_ascii=False)}"
            )
        return "\n".join(lines)

tools_map property

tools_map: dict[str, Callable]

Lazy loading of tools map. - collect tools registered by @register_tool

get_column_info async

get_column_info(file_path: str) -> str

Get basic column information from a tabular data file (e.g. csv, xlsx).

Parameters:

Name Type Description Default
file_path str

Path to the tabular data file.

required

Returns:

Name Type Description
str str

Basic column information including column name, type, and sample value.

Source code in utu/tools/tabular_data_toolkit.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
@register_tool
async def get_column_info(self, file_path: str) -> str:
    """Get basic column information from a tabular data file (e.g. csv, xlsx).

    Args:
        file_path (str): Path to the tabular data file.

    Returns:
        str: Basic column information including column name, type, and sample value.
    """
    column_info_str = self.get_tabular_columns(file_path)
    prompt = TOOL_PROMPTS["tabular_column_info"].format(column_info=column_info_str)
    logger.info(f"[tool] get_column_info: {file_path}")

    response = await self.llm.query_one(
        messages=[{"role": "user", "content": prompt}],
        # **self.config.config_llm.model_params.model_dump()
    )
    return response

get_tools_map_func

get_tools_map_func() -> dict[str, Callable]

Get tools map. It will filter tools by config.activated_tools if it is not None.

Source code in utu/tools/base.py
36
37
38
39
40
41
42
43
44
45
def get_tools_map_func(self) -> dict[str, Callable]:
    """Get tools map. It will filter tools by config.activated_tools if it is not None."""
    if self.config.activated_tools:
        assert all(tool_name in self.tools_map for tool_name in self.config.activated_tools), (
            f"Error config activated tools: {self.config.activated_tools}! available tools: {self.tools_map.keys()}"
        )
        tools_map = {tool_name: self.tools_map[tool_name] for tool_name in self.config.activated_tools}
    else:
        tools_map = self.tools_map
    return tools_map

get_tools_in_agents

get_tools_in_agents() -> list[FunctionTool]

Get tools in openai-agents format.

Source code in utu/tools/base.py
47
48
49
50
51
52
53
54
55
56
57
58
def get_tools_in_agents(self) -> list[FunctionTool]:
    """Get tools in openai-agents format."""
    tools_map = self.get_tools_map_func()
    tools = []
    for _, tool in tools_map.items():
        tools.append(
            function_tool(
                tool,
                strict_mode=False,  # turn off strict mode
            )
        )
    return tools

get_tools_in_openai

get_tools_in_openai() -> list[dict]

Get tools in OpenAI format.

Source code in utu/tools/base.py
60
61
62
63
def get_tools_in_openai(self) -> list[dict]:
    """Get tools in OpenAI format."""
    tools = self.get_tools_in_agents()
    return [ChatCompletionConverter.tool_to_openai(tool) for tool in tools]

get_tools_in_mcp

get_tools_in_mcp() -> list[Tool]

Get tools in MCP format.

Source code in utu/tools/base.py
65
66
67
68
def get_tools_in_mcp(self) -> list[types.Tool]:
    """Get tools in MCP format."""
    tools = self.get_tools_in_agents()
    return [MCPConverter.function_tool_to_mcp(tool) for tool in tools]

call_tool async

call_tool(name: str, arguments: dict) -> str

Call a tool by its name.

Source code in utu/tools/base.py
70
71
72
73
74
75
76
async def call_tool(self, name: str, arguments: dict) -> str:
    """Call a tool by its name."""
    tools_map = self.get_tools_map_func()
    if name not in tools_map:
        raise ValueError(f"Tool {name} not found")
    tool = tools_map[name]
    return await tool(**arguments)