Merge pull request #496 from SunFlowerUE/master

新增了一个 写入 Word 文档(.docx) 的工具模块。
This commit is contained in:
Jerry Z H
2025-10-31 15:04:55 +08:00
committed by GitHub
8 changed files with 319 additions and 42 deletions

View File

@@ -6,11 +6,13 @@
# @FileName: csv_reader.py
import csv
import io
from pathlib import Path
from typing import List, Union, Optional, Dict
from typing import List, Union, Optional, Dict, TextIO
from agentuniverse.agent.action.knowledge.reader.reader import Reader
from agentuniverse.agent.action.knowledge.store.document import Document
from agentuniverse.agent.action.knowledge.reader.utils import detect_file_encoding
class CSVReader(Reader):
@@ -20,57 +22,64 @@ class CSVReader(Reader):
"""
def _load_data(self,
file: Union[str, Path],
file: Union[str, Path, TextIO],
delimiter: str = ",",
quotechar: str = '"',
ext_info: Optional[Dict] = None) -> List[Document]:
"""Parse CSV file.
Args:
file: CSV file path or file object
delimiter: CSV delimiter, default is comma
quotechar: Quote character, default is double quote
ext_info: Additional metadata information
Returns:
List[Document]: List of documents containing CSV content
Raises:
FileNotFoundError: Raised when file does not exist
ValueError: Raised when file reading fails
"""
"""Parse CSV file."""
try:
text_stream: TextIO
should_close = False
if isinstance(file, str):
file = Path(file)
if isinstance(file, Path):
if not file.exists():
raise FileNotFoundError(f"File not found: {file}")
file_content = file.open(newline="", mode="r", encoding="utf-8")
encoding = detect_file_encoding(file)
text_stream = file.open(newline="", mode="r", encoding=encoding)
should_close = True
elif hasattr(file, "read"):
try:
file.seek(0)
except (AttributeError, OSError):
pass
raw_content = file.read()
if isinstance(raw_content, bytes):
encoding = detect_file_encoding(raw_content)
text_stream = io.StringIO(raw_content.decode(encoding))
elif isinstance(raw_content, str):
text_stream = io.StringIO(raw_content)
else:
raise ValueError("Unsupported file object type")
should_close = True
else:
file.seek(0)
file_content = file
raise TypeError("file must be a path string, Path, or file-like object")
csv_content = []
with file_content as csvfile:
csv_reader = csv.reader(csvfile, delimiter=delimiter, quotechar=quotechar)
csv_content: List[str] = []
try:
csv_reader = csv.reader(text_stream, delimiter=delimiter, quotechar=quotechar)
for row in csv_reader:
# Filter out completely empty rows
if any(cell.strip() for cell in row):
# Remove empty values at the end of row
while row and not row[-1].strip():
row.pop()
# Only add non-empty values to result
csv_content.append(", ".join(filter(None, row)))
finally:
if should_close:
text_stream.close()
# Combine all valid rows into final text
final_content = "\n".join(csv_content)
# Get metadata
metadata = {"file_name": getattr(file, 'name', 'unknown')}
if isinstance(file, Path):
file_name = file.name
else:
name_attr = getattr(file, 'name', None)
file_name = Path(name_attr).name if isinstance(name_attr, str) else 'unknown'
metadata = {"file_name": file_name}
if ext_info:
metadata.update(ext_info)
# print(f"csv_content: {final_content} \n metadata: {metadata}")
return [Document(text=final_content, metadata=metadata)]
except Exception as e:
raise ValueError(f"Failed to read CSV file: {str(e)}") from e

View File

@@ -10,8 +10,11 @@ from typing import Dict, Type, List, Optional
from agentuniverse.agent.action.knowledge.reader.file.docx_reader import DocxReader
from agentuniverse.agent.action.knowledge.reader.file.epub_reader import EpubReader
from agentuniverse.agent.action.knowledge.reader.file.markdown_reader import MarkdownReader
from agentuniverse.agent.action.knowledge.reader.file.pdf_reader import PdfReader
from agentuniverse.agent.action.knowledge.reader.file.pptx_reader import PptxReader
from agentuniverse.agent.action.knowledge.reader.file.txt_reader import TxtReader
from agentuniverse.agent.action.knowledge.reader.file.csv_reader import CSVReader
from agentuniverse.agent.action.knowledge.reader.file.rar_reader import RarReader
from agentuniverse.agent.action.knowledge.reader.file.xlsx_reader import XlsxReader
from agentuniverse.agent.action.knowledge.reader.file.zip_reader import ZipReader
@@ -24,6 +27,10 @@ DEFAULT_FILE_READERS: Dict[str, Type[Reader]] = {
".pptx": PptxReader,
".xlsx": XlsxReader,
".epub": EpubReader,
".txt": TxtReader,
".md": MarkdownReader,
".markdown": MarkdownReader,
".csv": CSVReader,
".rar": RarReader,
".zip": ZipReader,
}

View File

@@ -1,19 +1,19 @@
from pathlib import Path
from typing import List, Optional, Dict
from agentuniverse.agent.action.knowledge.reader.reader import Reader
from agentuniverse.agent.action.knowledge.store.document import Document
from agentuniverse.agent.action.knowledge.reader.utils import detect_file_encoding
class LineTxtReader(Reader):
def _load_data(self, fpath: Path, ext_info: Optional[Dict] = None) -> List[Document]:
dlist = []
dlist: List[Document] = []
encoding = detect_file_encoding(fpath)
with open(fpath, 'r', encoding='utf-8') as file:
metadata = {"file_name": file.name}
with open(fpath, 'r', encoding=encoding) as file:
metadata = {"file_name": Path(file.name).name}
if ext_info is not None:
metadata.update(ext_info)
@@ -27,10 +27,10 @@ class TxtReader(Reader):
"""Txt reader."""
def _load_data(self, fpath: Path, ext_info: Optional[Dict] = None) -> List[Document]:
encoding = detect_file_encoding(fpath)
with open(fpath, 'r', encoding='utf-8') as file:
metadata = {"file_name": file.name}
with open(fpath, 'r', encoding=encoding) as file:
metadata = {"file_name": Path(file.name).name}
if ext_info is not None:
metadata.update(ext_info)

View File

@@ -21,6 +21,9 @@ class ReaderManager(ComponentManagerBase[Reader]):
"pptx": "default_pptx_reader",
"docx": "default_docx_reader",
"txt": "default_txt_reader",
"md": "default_markdown_reader",
"markdown": "default_markdown_reader",
"csv": "default_csv_reader",
"rar": "default_rar_reader",
"zip": "default_zip_reader",
# extended defaults for web & images

View File

@@ -0,0 +1,69 @@
"""Utility helpers for reader implementations."""
from __future__ import annotations
from pathlib import Path
from typing import BinaryIO, Iterable, Sequence, Union
# Candidate encodings to try when automatic detection libraries are not available.
_FALLBACK_ENCODINGS: Sequence[str] = (
"utf-8",
"utf-8-sig",
"gb18030",
"gbk",
"big5",
"shift_jis",
"latin-1",
)
def _read_sample_bytes(source: Union[str, Path, BinaryIO, bytes, bytearray],
sample_size: int) -> bytes:
"""Read a byte sample from the given file path or binary handle."""
if isinstance(source, (bytes, bytearray)):
return bytes(source[:sample_size])
if isinstance(source, (str, Path)):
path = Path(source)
with path.open("rb") as handle:
return handle.read(sample_size)
# File-like object preserve the original pointer
handle = source
current_pos = handle.tell()
try:
data = handle.read(sample_size)
finally:
handle.seek(current_pos)
return data if data is not None else b""
def detect_file_encoding(source: Union[str, Path, BinaryIO, bytes, bytearray],
sample_size: int = 32 * 1024,
fallback_encodings: Iterable[str] = _FALLBACK_ENCODINGS) -> str:
"""Best-effort detection of the text encoding for the given file."""
sample = _read_sample_bytes(source, sample_size)
if not sample:
return "utf-8"
# First try decoding with a curated list of encodings
for encoding in fallback_encodings:
try:
sample.decode(encoding)
return encoding
except UnicodeDecodeError:
continue
# If the curated list fails, fall back to charset_normalizer if available
try: # pragma: no cover - optional dependency
from charset_normalizer import from_bytes
except ImportError: # pragma: no cover - handled above
best_guess = None
else:
result = from_bytes(sample).best()
best_guess = result.encoding if result is not None else None
if best_guess:
return best_guess
return "utf-8"
__all__ = ["detect_file_encoding"]

View File

@@ -0,0 +1,62 @@
import os
import json
from typing import Any, Dict
from agentuniverse.agent.action.tool.tool import Tool
class WriteWordDocumentTool(Tool):
def execute(self, file_path: str, content: str = "", append: bool = False) -> str:
directory = os.path.dirname(file_path)
if directory and not os.path.exists(directory):
try:
os.makedirs(directory, exist_ok=True)
except Exception as e:
return json.dumps(
{"error": f"Failed to create directory: {str(e)}", "file_path": file_path, "status": "error"}
)
try:
from docx import Document # type: ignore
except ImportError as e:
return json.dumps(
{
"error": f"python-docx is required to write Word documents: {str(e)}",
"file_path": file_path,
"status": "error",
}
)
if not file_path.lower().endswith(".docx"):
return json.dumps(
{"error": "The target file must have a .docx extension.", "file_path": file_path, "status": "error"}
)
document = None
if append and os.path.exists(file_path):
try:
document = Document(file_path)
except Exception as e:
return json.dumps(
{"error": f"Failed to load existing document: {str(e)}", "file_path": file_path, "status": "error"}
)
else:
document = Document()
try:
document.add_paragraph(content)
document.save(file_path)
file_size = os.path.getsize(file_path)
return json.dumps(
{
"file_path": file_path,
"bytes_written": len(content.encode("utf-8")),
"file_size": file_size,
"append_mode": append,
"status": "success",
}
)
except Exception as e:
return json.dumps(
{"error": f"Failed to write document: {str(e)}", "file_path": file_path, "status": "error"}
)

View File

@@ -0,0 +1,38 @@
from agentuniverse.agent.action.knowledge.reader.file.csv_reader import CSVReader
from agentuniverse.agent.action.knowledge.reader.file.txt_reader import TxtReader
from agentuniverse.agent.action.knowledge.reader.utils import detect_file_encoding
def test_detect_file_encoding_gb18030(tmp_path):
sample_text = "示例文本"
file_path = tmp_path / "sample.txt"
file_path.write_text(sample_text, encoding="gb18030")
detected = detect_file_encoding(file_path)
assert detected in {"gb18030", "gbk"}
def test_txt_reader_handles_gbk(tmp_path):
content = "第一行\n第二行"
file_path = tmp_path / "gbk.txt"
file_path.write_text(content, encoding="gb18030")
reader = TxtReader()
documents = reader.load_data(file_path)
assert len(documents) == 1
assert documents[0].text == content
assert documents[0].metadata["file_name"] == file_path.name
def test_csv_reader_handles_utf8_bom(tmp_path):
rows = ["col1,col2", "值1,值2"]
file_path = tmp_path / "data.csv"
file_path.write_text("\n".join(rows), encoding="utf-8-sig")
reader = CSVReader()
documents = reader.load_data(file_path)
assert len(documents) == 1
assert "值1" in documents[0].text
assert documents[0].metadata["file_name"] == file_path.name

View File

@@ -0,0 +1,89 @@
import os
import json
import tempfile
import unittest
from agentuniverse.agent.action.tool.common_tool.write_word_tool import WriteWordDocumentTool
class WriteWordDocumentToolTest(unittest.TestCase):
def setUp(self):
self.tool = WriteWordDocumentTool()
self.temp_dir = tempfile.mkdtemp()
def tearDown(self):
for root, dirs, files in os.walk(self.temp_dir, topdown=False):
for name in files:
os.unlink(os.path.join(root, name))
for name in dirs:
os.rmdir(os.path.join(root, name))
os.rmdir(self.temp_dir)
def test_write_new_word_file(self):
file_path = os.path.join(self.temp_dir, "test_new.docx")
content = "***This is a test paragraph.***"
result_json = self.tool.execute(file_path=file_path, content=content, append=False)
result = json.loads(result_json)
self.assertEqual(result["status"], "success")
self.assertEqual(result["file_path"], file_path)
self.assertTrue(os.path.exists(file_path))
def test_append_to_word_file(self):
file_path = os.path.join(self.temp_dir, "test_append.docx")
initial_content = "Initial paragraph."
self.tool.execute(file_path=file_path, content=initial_content, append=False)
append_content = "Appended paragraph."
result_json = self.tool.execute(file_path=file_path, content=append_content, append=True)
result = json.loads(result_json)
self.assertEqual(result["status"], "success")
self.assertEqual(result["append_mode"], True)
def test_invalid_file_extension(self):
file_path = os.path.join(self.temp_dir, "invalid_file.txt")
content = "This should fail."
result_json = self.tool.execute(file_path=file_path, content=content, append=False)
result = json.loads(result_json)
self.assertEqual(result["status"], "error")
self.assertIn("The target file must have a .docx extension.", result["error"])
def test_create_directory_structure(self):
file_path = os.path.join(self.temp_dir, "nested/dir/structure/test.docx")
content = "Test content in nested directory."
result_json = self.tool.execute(file_path=file_path, content=content, append=False)
result = json.loads(result_json)
self.assertEqual(result["status"], "success")
self.assertTrue(os.path.exists(file_path))
self.assertTrue(os.path.isdir(os.path.join(self.temp_dir, "nested/dir/structure")))
def test_missing_dependency(self):
original_import = __import__
def mock_import(name, *args):
if name == "docx":
raise ImportError("No module named 'docx'")
return original_import(name, *args)
try:
__builtins__["__import__"] = mock_import
file_path = os.path.join(self.temp_dir, "test_missing_dependency.docx")
content = "This should fail due to missing dependency."
result_json = self.tool.execute(file_path=file_path, content=content, append=False)
result = json.loads(result_json)
self.assertEqual(result["status"], "error")
self.assertIn("python-docx is required to write Word documents", result["error"])
finally:
__builtins__["__import__"] = original_import
if __name__ == "__main__":
unittest.main()