diff --git a/agentuniverse/agent/action/tool/common_tool/write_file_tool.py b/agentuniverse/agent/action/tool/common_tool/write_file_tool.py new file mode 100644 index 00000000..8e295208 --- /dev/null +++ b/agentuniverse/agent/action/tool/common_tool/write_file_tool.py @@ -0,0 +1,51 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2025/3/22 19:15 +# @Author : hiro +# @Email : hiromesh@qq.com +# @FileName: write_file_tool.py + +import os +import json + +from agentuniverse.agent.action.tool.tool import Tool, ToolInput + + +class WriteFileTool(Tool): + def execute(self, tool_input: ToolInput) -> str: + file_path = tool_input.get_data("file_path") + content = tool_input.get_data("content", "") + append = tool_input.get_data("append", False) + + directory = os.path.dirname(file_path) + if directory and not os.path.exists(directory): + try: + os.makedirs(directory, exist_ok=True) + except Exception as e: + return json.dumps({ + "error": f"Failed to create directory: {str(e)}", + "file_path": file_path, + "status": "error" + }) + + try: + mode = 'a' if append else 'w' + + with open(file_path, mode, encoding='utf-8') as file: + file.write(content) + file_size = os.path.getsize(file_path) + return json.dumps({ + "file_path": file_path, + "bytes_written": len(content.encode('utf-8')), + "file_size": file_size, + "append_mode": append, + "status": "success" + }) + + except Exception as e: + return json.dumps({ + "error": str(e), + "file_path": file_path, + "status": "error" + }) diff --git a/tests/test_agentuniverse/unit/agent/action/tool/test_write_file.py b/tests/test_agentuniverse/unit/agent/action/tool/test_write_file.py new file mode 100644 index 00000000..f0d587b7 --- /dev/null +++ b/tests/test_agentuniverse/unit/agent/action/tool/test_write_file.py @@ -0,0 +1,95 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2025/03/22 19:16 +# @Author : hiro +# @Email : hiromesh@qq.com +# @FileName: test_write_file.py + +import os +import json +import tempfile +import unittest + +from agentuniverse.agent.action.tool.tool import ToolInput +from agentuniverse.agent.action.tool.common_tool.write_file_tool import WriteFileTool + + +class WriteFileToolTest(unittest.TestCase): + def setUp(self): + self.tool = WriteFileTool() + self.temp_dir = tempfile.mkdtemp() + + def tearDown(self): + for root, dirs, files in os.walk(self.temp_dir, topdown=False): + for name in files: + os.unlink(os.path.join(root, name)) + for name in dirs: + os.rmdir(os.path.join(root, name)) + os.rmdir(self.temp_dir) + + def test_write_new_file(self): + file_path = os.path.join(self.temp_dir, 'test_new.txt') + content = "This is a test file content" + + tool_input = ToolInput({ + 'file_path': file_path, + 'content': content + }) + + result_json = self.tool.execute(tool_input) + result = json.loads(result_json) + + self.assertEqual(result['status'], 'success') + self.assertEqual(result['file_path'], file_path) + self.assertTrue(os.path.exists(file_path)) + + with open(file_path, 'r') as f: + self.assertEqual(f.read(), content) + + def test_append_to_file(self): + file_path = os.path.join(self.temp_dir, 'test_append.txt') + + initial_content = "Initial content\n" + tool_input = ToolInput({ + 'file_path': file_path, + 'content': initial_content + }) + self.tool.execute(tool_input) + + append_content = "Appended content" + tool_input = ToolInput({ + 'file_path': file_path, + 'content': append_content, + 'append': True + }) + + result_json = self.tool.execute(tool_input) + result = json.loads(result_json) + + self.assertEqual(result['status'], 'success') + self.assertEqual(result['append_mode'], True) + + with open(file_path, 'r') as f: + self.assertEqual(f.read(), initial_content + append_content) + + def test_create_directory_structure(self): + file_path = os.path.join(self.temp_dir, 'nested/dir/structure/test.txt') + content = "Test content in nested directory" + + tool_input = ToolInput({ + 'file_path': file_path, + 'content': content + }) + + result_json = self.tool.execute(tool_input) + result = json.loads(result_json) + + self.assertEqual(result['status'], 'success') + self.assertTrue(os.path.exists(file_path)) + + self.assertTrue(os.path.isdir(os.path.join(self.temp_dir, 'nested/dir/structure'))) + + +if __name__ == '__main__': + unittest.main()