diff --git a/tests/test_agentuniverse/unit/agent/action/knowledge/reader/file/test_sevenzip_reader.py b/tests/test_agentuniverse/unit/agent/action/knowledge/reader/file/test_sevenzip_reader.py new file mode 100644 index 00000000..203e3604 --- /dev/null +++ b/tests/test_agentuniverse/unit/agent/action/knowledge/reader/file/test_sevenzip_reader.py @@ -0,0 +1,627 @@ +import unittest +import tempfile +import os +import shutil +import time +from pathlib import Path +from agentuniverse.agent.action.knowledge.reader.file.sevenzip_reader import SevenZipReader + +class TestSevenZipReaderBasic(unittest.TestCase): + """SevenZipReader 基础功能测试""" + def setUp(self): + self.reader = SevenZipReader() + self.temp_dir = tempfile.mkdtemp() + def tearDown(self): + if os.path.exists(self.temp_dir): + shutil.rmtree(self.temp_dir) + def create_test_files(self): + test_txt_path = os.path.join(self.temp_dir, "test.txt") + with open(test_txt_path, 'w', encoding='utf-8') as f: + f.write("这是一个测试文本文件,用于验证 7Z 读取器功能。") + test_py_path = os.path.join(self.temp_dir, "test.py") + with open(test_py_path, 'w', encoding='utf-8') as f: + f.write("""#!/usr/bin/env python3 + def hello(): + print('Hello, 7Z Reader!') + def process_data(data): + return [x * 2 for x in data]""") + test_json_path = os.path.join(self.temp_dir, "config.json") + with open(test_json_path, 'w', encoding='utf-8') as f: + f.write("""{"app_name": "7Z Reader Test", "version": "1.0.0", "features": ["compression", "extraction", "metadata"]}""") + test_yaml_path = os.path.join(self.temp_dir, "settings.yaml") + with open(test_yaml_path, 'w', encoding='utf-8') as f: + f.write("""app:\n name: "7Z Test Application"\n debug: true\ndatabase:\n host: "localhost"\n port: 5432""") + return [test_txt_path, test_py_path, test_json_path, test_yaml_path] + def create_test_7z(self, files=None, password=None): + try: + import py7zr + except ImportError: + self.skipTest("py7zr not available for testing") + if files is None: + files = self.create_test_files() + test_7z_path = os.path.join(self.temp_dir, "test_archive.7z") + try: + if password: + with py7zr.SevenZipFile(test_7z_path, 'w', password=password) as archive: + for file_path in files: + archive.write(file_path, os.path.basename(file_path)) + else: + with py7zr.SevenZipFile(test_7z_path, 'w') as archive: + for file_path in files: + archive.write(file_path, os.path.basename(file_path)) + except Exception as e: + self.skipTest(f"Failed to create 7Z archive: {e}") + return test_7z_path + def test_load_data_success(self): + try: + import py7zr + except ImportError: + self.skipTest("py7zr not available for testing") + test_7z_path = self.create_test_7z() + documents = self.reader._load_data(test_7z_path) + self.assertIsInstance(documents, list) + self.assertGreater(len(documents), 0) + file_names = [doc.metadata.get('file_name') for doc in documents] + self.assertIn('test.txt', file_names) + self.assertIn('test.py', file_names) + self.assertIn('config.json', file_names) + def test_load_data_with_custom_metadata(self): + try: + import py7zr + except ImportError: + self.skipTest("py7zr not available for testing") + test_7z_path = self.create_test_7z() + custom_metadata = {"source": "test_suite", "version": "2.0.0", "compression_format": "7Z", "test_scenario": "metadata_validation"} + documents = self.reader._load_data(test_7z_path, ext_info=custom_metadata) + self.assertGreater(len(documents), 0) + doc = documents[0] + self.assertEqual(doc.metadata['source'], 'test_suite') + self.assertEqual(doc.metadata['version'], '2.0.0') + self.assertEqual(doc.metadata['compression_format'], '7Z') + def test_load_data_file_not_found(self): + non_existent_file = os.path.join(self.temp_dir, "non_existent.7z") + with self.assertRaises(FileNotFoundError): + self.reader._load_data(non_existent_file) + def test_metadata_structure(self): + try: + import py7zr + except ImportError: + self.skipTest("py7zr not available for testing") + test_7z_path = self.create_test_7z() + documents = self.reader._load_data(test_7z_path) + self.assertGreater(len(documents), 0) + doc = documents[0] + required_fields = ['file_name', 'file_path', 'file_suffix', 'archive_root', 'archive_path', 'archive_depth'] + for field in required_fields: + self.assertIn(field, doc.metadata) + self.assertEqual(doc.metadata['archive_depth'], 0) + def test_content_extraction_accuracy(self): + try: + import py7zr + except ImportError: + self.skipTest("py7zr not available for testing") + test_content = "这是用于验证内容提取准确性的测试文本。" + test_file = os.path.join(self.temp_dir, "content_test.txt") + with open(test_file, 'w', encoding='utf-8') as f: + f.write(test_content) + test_7z_path = self.create_test_7z(files=[test_file]) + documents = self.reader._load_data(test_7z_path) + self.assertEqual(len(documents), 1) + self.assertEqual(documents[0].text, test_content) + +class TestSevenZipReaderComplexScenarios(unittest.TestCase): + """SevenZipReader 复杂场景测试""" + def setUp(self): + self.reader = SevenZipReader() + self.temp_dir = tempfile.mkdtemp() + def tearDown(self): + if os.path.exists(self.temp_dir): + shutil.rmtree(self.temp_dir) + def create_complex_project_structure(self): + project_dir = os.path.join(self.temp_dir, "complex_project") + os.makedirs(project_dir, exist_ok=True) + directories = ["src/utils", "src/models", "tests/unit", "tests/integration", "docs/api", "docs/tutorials", "config/environments", "data/raw", "data/processed", "logs/applications"] + for directory in directories: + os.makedirs(os.path.join(project_dir, directory), exist_ok=True) + with open(os.path.join(project_dir, "README.md"), 'w', encoding='utf-8') as f: + f.write("""# 复杂 7Z 项目\n这是一个用于测试 SevenZipReader 复杂场景的项目结构。\n## 功能特性\n- 多层目录结构\n- 多种文件格式支持\n- 完整的开发环境模拟""") + with open(os.path.join(project_dir, "src", "main.py"), 'w', encoding='utf-8') as f: + f.write("""#!/usr/bin/env python3\nimport sys\nimport json\nfrom utils.helpers import load_config\nfrom models.processor import DataProcessor\n\ndef main():\n config = load_config()\n processor = DataProcessor(config)\n try:\n data = processor.load_data()\n result = processor.process(data)\n processor.save_result(result)\n print("Processing completed successfully")\n except Exception as e:\n print(f"Error: {e}")\n sys.exit(1)\n\nif __name__ == "__main__":\n main()""") + with open(os.path.join(project_dir, "src", "utils", "helpers.py"), 'w', encoding='utf-8') as f: + f.write("""import yaml\nimport json\n\ndef load_config():\n with open('config/environments/development.yaml', 'r') as f:\n return yaml.safe_load(f)\n\ndef setup_logging():\n import logging\n logging.basicConfig(level=logging.INFO)\n return logging.getLogger(__name__)""") + with open(os.path.join(project_dir, "src", "models", "processor.py"), 'w', encoding='utf-8') as f: + f.write("""class DataProcessor:\n def __init__(self, config):\n self.config = config\n self.logger = None\n def load_data(self):\n import pandas as pd\n return pd.DataFrame({'id': [1, 2, 3], 'value': [100, 200, 300]})\n def process(self, data):\n data['processed'] = data['value'] * 2\n return data\n def save_result(self, result):\n result.to_csv('data/processed/result.csv', index=False)""") + with open(os.path.join(project_dir, "tests", "unit", "test_processor.py"), 'w', encoding='utf-8') as f: + f.write("""import unittest\nfrom src.models.processor import DataProcessor\n\nclass TestDataProcessor(unittest.TestCase):\n def setUp(self):\n self.config = {'debug': True}\n self.processor = DataProcessor(self.config)\n def test_processor_initialization(self):\n self.assertIsNotNone(self.processor)\n self.assertEqual(self.processor.config, self.config)""") + with open(os.path.join(project_dir, "tests", "integration", "test_main.py"), 'w', encoding='utf-8') as f: + f.write("""import unittest\nimport sys\nimport os\nsys.path.append(os.path.join(os.path.dirname(__file__), '../../src'))\n\nclass TestMainIntegration(unittest.TestCase):\n def test_import_modules(self):\n try:\n from main import main\n from utils.helpers import load_config\n self.assertTrue(True)\n except ImportError as e:\n self.fail(f"Import failed: {e}")""") + with open(os.path.join(project_dir, "config", "environments", "development.yaml"), 'w', encoding='utf-8') as f: + f.write("""environment: "development"\ndebug: true\ndatabase:\n host: "localhost"\n port: 5432\n name: "dev_db"\nlogging:\n level: "DEBUG"\n file: "logs/applications/app.log" """) + with open(os.path.join(project_dir, "config", "environments", "production.yaml"), 'w', encoding='utf-8') as f: + f.write("""environment: "production"\ndebug: false\ndatabase:\n host: "db.production.com"\n port: 5432\n name: "prod_db"\nlogging:\n level: "WARNING"\n file: "/var/log/applications/app.log" """) + with open(os.path.join(project_dir, "docs", "api", "rest_api.md"), 'w', encoding='utf-8') as f: + f.write("""# REST API 文档\n## 用户端点\n### GET /api/users\n获取用户列表\n### POST /api/users\n创建新用户\n## 数据端点\n### GET /api/data\n获取数据""") + with open(os.path.join(project_dir, "docs", "tutorials", "getting_started.md"), 'w', encoding='utf-8') as f: + f.write("""# 入门教程\n## 安装依赖\n```bash\npip install -r requirements.txt\n```\n## 运行应用\n```bash\npython src/main.py\n```""") + with open(os.path.join(project_dir, "data", "raw", "sample_data.json"), 'w', encoding='utf-8') as f: + f.write("""[{"id": 1, "name": "项目A", "value": 100}, {"id": 2, "name": "项目B", "value": 200}, {"id": 3, "name": "项目C", "value": 300}]""") + with open(os.path.join(project_dir, "data", "raw", "users.csv"), 'w', encoding='utf-8') as f: + f.write("""id,username,email,department\n1,alice,alice@example.com,Engineering\n2,bob,bob@example.com,Marketing\n3,charlie,charlie@example.com,Sales""") + return project_dir + def create_nested_7z_structure(self): + try: + import py7zr + except ImportError: + self.skipTest("py7zr not available") + base_dir = os.path.join(self.temp_dir, "nested") + os.makedirs(base_dir, exist_ok=True) + level3_dir = os.path.join(base_dir, "level3") + os.makedirs(level3_dir, exist_ok=True) + with open(os.path.join(level3_dir, "deep_config.json"), 'w', encoding='utf-8') as f: + f.write("""{"level": 3, "description": "最深层的配置文件", "security": {"encryption": true, "access_level": "high"}}""") + with open(os.path.join(level3_dir, "secret_data.txt"), 'w', encoding='utf-8') as f: + f.write("这是嵌套在最深层的敏感数据文件。") + level3_7z = os.path.join(base_dir, "level3_archive.7z") + try: + with py7zr.SevenZipFile(level3_7z, 'w') as archive: + archive.writeall(level3_dir, 'level3') + except Exception as e: + self.skipTest(f"Failed to create level3 7Z: {e}") + level2_dir = os.path.join(base_dir, "level2") + os.makedirs(level2_dir, exist_ok=True) + with open(os.path.join(level2_dir, "middle_document.md"), 'w', encoding='utf-8') as f: + f.write("""# 中层文档\n这是第二层的文档文件。\n## 包含内容\n- 业务逻辑说明\n- 下一层的压缩包\n- 配置信息""") + shutil.copy(level3_7z, level2_dir) + with open(os.path.join(level2_dir, "business_rules.yaml"), 'w', encoding='utf-8') as f: + f.write("""rules:\n - name: "数据验证规则"\n condition: "data.value > 0"\n action: "accept"\n - name: "安全规则"\n condition: "user.role == 'admin'"\n action: "grant_access" """) + level2_7z = os.path.join(base_dir, "level2_archive.7z") + try: + with py7zr.SevenZipFile(level2_7z, 'w') as archive: + archive.writeall(level2_dir, 'level2') + except Exception as e: + self.skipTest(f"Failed to create level2 7Z: {e}") + level1_dir = os.path.join(base_dir, "level1") + os.makedirs(level1_dir, exist_ok=True) + with open(os.path.join(level1_dir, "project_overview.txt"), 'w', encoding='utf-8') as f: + f.write("7Z 嵌套压缩包演示项目\n\n本项目展示了 SevenZipReader 处理多层嵌套 7Z 压缩包的能力。") + shutil.copy(level2_7z, level1_dir) + with open(os.path.join(level1_dir, "system_config.xml"), 'w', encoding='utf-8') as f: + f.write("""\n\n 7Z嵌套演示系统\n 2.0.0\n \n 数据压缩模块\n 配置管理模块\n 嵌套处理模块\n \n""") + final_7z = os.path.join(self.temp_dir, "nested_project.7z") + try: + with py7zr.SevenZipFile(final_7z, 'w') as archive: + archive.writeall(level1_dir, 'level1') + except Exception as e: + self.skipTest(f"Failed to create final nested 7Z: {e}") + return final_7z + def test_complex_project_structure(self): + try: + import py7zr + except ImportError: + self.skipTest("py7zr not available") + project_dir = self.create_complex_project_structure() + sevenzip_path = os.path.join(self.temp_dir, "complex_project.7z") + try: + with py7zr.SevenZipFile(sevenzip_path, 'w') as archive: + archive.writeall(project_dir, '') + except Exception as e: + self.skipTest(f"Failed to create project 7Z: {e}") + documents = self.reader._load_data(sevenzip_path) + self.assertGreater(len(documents), 10) + file_types = set(doc.metadata.get('file_suffix') for doc in documents) + expected_types = {'.py', '.yaml', '.md', '.json', '.csv'} + for expected_type in expected_types: + self.assertIn(expected_type, file_types) + file_names = [doc.metadata.get('file_name') for doc in documents] + self.assertIn('main.py', file_names) + self.assertIn('development.yaml', file_names) + self.assertIn('sample_data.json', file_names) + def test_nested_7z_archives(self): + try: + import py7zr + except ImportError: + self.skipTest("py7zr not available") + nested_7z = self.create_nested_7z_structure() + documents = self.reader._load_data(nested_7z, max_depth=4) + self.assertGreater(len(documents), 0) + depths = set(doc.metadata.get('archive_depth') for doc in documents) + self.assertTrue(len(depths) > 1) + deep_files = [doc for doc in documents if doc.metadata.get('archive_depth', 0) > 1] + self.assertGreater(len(deep_files), 0) + def test_multiple_file_types_and_encodings(self): + try: + import py7zr + except ImportError: + self.skipTest("py7zr not available") + files_dir = os.path.join(self.temp_dir, "mixed_files") + os.makedirs(files_dir, exist_ok=True) + files_content = { + "document.txt": "这是 UTF-8 编码的文本文件内容", + "script.py": "#!/usr/bin/env python3\nprint('Python 脚本文件')", + "data.json": '{"类型": "测试", "数值": 123, "数组": [1, 2, 3]}', + "config.yaml": "应用:\n 名称: 测试应用\n 端口: 8080\n调试: true", + "code.js": "// JavaScript 文件\nconsole.log('Hello, 7Z Reader!');", + "style.css": "/* CSS 文件 */\nbody { margin: 0; font-family: Arial; }", + "page.html": "\n\n\n 测试页面\n\n\n

Hello, 7Z Reader!

\n\n", + "database.sql": "-- SQL 脚本\nCREATE TABLE users (\n id INT PRIMARY KEY,\n name VARCHAR(100),\n email VARCHAR(255)\n);", + "log_file.log": "[2024-01-01 10:00:00] INFO: 应用程序启动\n[2024-01-01 10:00:01] DEBUG: 加载配置", + "special_中文文件.md": "# 包含中文文件名的文件\n这是测试中文文件名支持的文件。" + } + for filename, content in files_content.items(): + with open(os.path.join(files_dir, filename), 'w', encoding='utf-8') as f: + f.write(content) + sevenzip_path = os.path.join(self.temp_dir, "mixed_files.7z") + try: + with py7zr.SevenZipFile(sevenzip_path, 'w') as archive: + archive.writeall(files_dir, '') + except Exception as e: + self.skipTest(f"Failed to create mixed files 7Z: {e}") + documents = self.reader._load_data(sevenzip_path) + self.assertEqual(len(documents), len(files_content)) + found_suffixes = set(doc.metadata.get('file_suffix') for doc in documents) + expected_suffixes = {'.txt', '.py', '.json', '.yaml', '.js', '.css', '.html', '.sql', '.log', '.md'} + for suffix in expected_suffixes: + self.assertIn(suffix, found_suffixes) + chinese_files = [doc for doc in documents if '中文' in doc.metadata.get('file_name', '')] + self.assertGreater(len(chinese_files), 0) + +class TestSevenZipReaderSizeLimits(unittest.TestCase): + """SevenZipReader 大小限制测试""" + def setUp(self): + self.reader = SevenZipReader() + self.temp_dir = tempfile.mkdtemp() + def tearDown(self): + if os.path.exists(self.temp_dir): + shutil.rmtree(self.temp_dir) + def test_max_file_size_limit(self): + try: + import py7zr + except ImportError: + self.skipTest("py7zr not available") + large_file = os.path.join(self.temp_dir, "large_data.txt") + with open(large_file, 'w', encoding='utf-8') as f: + for i in range(20000): + f.write(f"Line {i}: 这是用于测试大文件处理的数据行,包含一些变化内容以避免过度压缩。 {i * 123}\n") + sevenzip_path = os.path.join(self.temp_dir, "large_file.7z") + try: + with py7zr.SevenZipFile(sevenzip_path, 'w') as archive: + archive.write(large_file, 'large_data.txt') + except Exception as e: + self.skipTest(f"Failed to create large file 7Z: {e}") + documents = self.reader._load_data(sevenzip_path, max_file_size=100 * 1024) + self.assertIsInstance(documents, list) + self.assertLessEqual(len(documents), 1) + def test_max_total_size_limit(self): + try: + import py7zr + except ImportError: + self.skipTest("py7zr not available") + files = [] + for i in range(5): + filepath = os.path.join(self.temp_dir, f"data_file_{i}.txt") + with open(filepath, 'w', encoding='utf-8') as f: + content = f"文件 {i} 的内容:\n" + content += "这是一些测试数据 " * 1000 + f.write(content) + files.append(filepath) + sevenzip_path = os.path.join(self.temp_dir, "multiple_files.7z") + try: + with py7zr.SevenZipFile(sevenzip_path, 'w') as archive: + for file_path in files: + archive.write(file_path, os.path.basename(file_path)) + except Exception as e: + self.skipTest(f"Failed to create multiple files 7Z: {e}") + documents = self.reader._load_data(sevenzip_path, max_total_size=100 * 1024) + self.assertIsInstance(documents, list) + self.assertLess(len(documents), len(files)) + def test_max_files_limit(self): + try: + import py7zr + except ImportError: + self.skipTest("py7zr not available") + files_dir = os.path.join(self.temp_dir, "many_files") + os.makedirs(files_dir, exist_ok=True) + file_count = 25 + for i in range(file_count): + filepath = os.path.join(files_dir, f"small_file_{i:02d}.txt") + with open(filepath, 'w', encoding='utf-8') as f: + f.write(f"这是小文件 {i} 的内容。") + sevenzip_path = os.path.join(self.temp_dir, "many_files.7z") + try: + with py7zr.SevenZipFile(sevenzip_path, 'w') as archive: + archive.writeall(files_dir, '') + except Exception as e: + self.skipTest(f"Failed to create many files 7Z: {e}") + documents = self.reader._load_data(sevenzip_path, max_files=10) + self.assertLessEqual(len(documents), 10) + def test_compression_ratio_detection(self): + try: + import py7zr + except ImportError: + self.skipTest("py7zr not available") + test_file = os.path.join(self.temp_dir, "highly_compressible.txt") + with open(test_file, 'w', encoding='utf-8') as f: + f.write("0" * (100 * 1024)) + sevenzip_path = os.path.join(self.temp_dir, "high_compression.7z") + try: + with py7zr.SevenZipFile(sevenzip_path, 'w') as archive: + archive.write(test_file, 'highly_compressible.txt') + except Exception as e: + self.skipTest(f"Failed to create high compression 7Z: {e}") + try: + documents = self.reader._load_data(sevenzip_path, max_compression_ratio=10.0) + self.assertIsInstance(documents, list) + except ValueError as e: + if "compression ratio" in str(e).lower(): + pass + else: + raise + +class TestSevenZipReaderRealWorldScenarios(unittest.TestCase): + """SevenZipReader 真实世界场景测试""" + def setUp(self): + self.reader = SevenZipReader() + self.temp_dir = tempfile.mkdtemp() + def tearDown(self): + if os.path.exists(self.temp_dir): + shutil.rmtree(self.temp_dir) + def create_software_distribution_package(self): + dist_dir = os.path.join(self.temp_dir, "myapp_v2.0.0") + os.makedirs(dist_dir, exist_ok=True) + os.makedirs(os.path.join(dist_dir, "bin"), exist_ok=True) + os.makedirs(os.path.join(dist_dir, "lib"), exist_ok=True) + os.makedirs(os.path.join(dist_dir, "docs"), exist_ok=True) + os.makedirs(os.path.join(dist_dir, "config"), exist_ok=True) + os.makedirs(os.path.join(dist_dir, "examples"), exist_ok=True) + with open(os.path.join(dist_dir, "INSTALL.txt"), 'w', encoding='utf-8') as f: + f.write("""MyApp 2.0.0 安装说明\n========================\n系统要求:\n- Python 3.8+\n- 100MB 可用磁盘空间\n\n安装步骤:\n1. 解压此压缩包\n2. 运行 bin/install.py\n3. 按照提示完成配置\n\n技术支持:\n- 邮箱: support@myapp.com\n- 文档: docs/manual.html""") + with open(os.path.join(dist_dir, "bin", "install.py"), 'w', encoding='utf-8') as f: + f.write("""#!/usr/bin/env python3\nimport sys\nimport os\nimport shutil\n\ndef main():\n print("MyApp 安装程序")\n print("==============")\n if sys.version_info < (3, 8):\n print("错误: 需要 Python 3.8 或更高版本")\n sys.exit(1)\n lib_src = os.path.join(os.path.dirname(__file__), '../lib')\n lib_dest = '/usr/local/lib/myapp'\n try:\n shutil.copytree(lib_src, lib_dest)\n print(f"库文件已安装到: {lib_dest}")\n except Exception as e:\n print(f"安装库文件时出错: {e}")\n sys.exit(1)\n print("安装完成!")\n\nif __name__ == "__main__":\n main()""") + with open(os.path.join(dist_dir, "lib", "core.py"), 'w', encoding='utf-8') as f: + f.write("""class ApplicationCore:\n def __init__(self, config):\n self.config = config\n self.plugins = []\n def initialize(self):\n self._load_plugins()\n self._setup_database()\n def _load_plugins(self):\n import os\n plugin_dir = self.config.get('plugin_dir', './plugins')\n if os.path.exists(plugin_dir):\n for file in os.listdir(plugin_dir):\n if file.endswith('.py'):\n self._load_plugin(os.path.join(plugin_dir, file))\n def _load_plugin(self, plugin_path):\n plugin_name = os.path.basename(plugin_path).replace('.py', '')\n self.plugins.append(plugin_name)\n def _setup_database(self):\n db_config = self.config.get('database', {})\n if db_config:\n print(f"数据库配置: {db_config}")\n def run(self):\n print("MyApp 核心运行中...")\n print(f"已加载插件: {self.plugins}")""") + with open(os.path.join(dist_dir, "docs", "manual.html"), 'w', encoding='utf-8') as f: + f.write("""\n\n\n MyApp 用户手册\n\n\n

MyApp 用户手册

\n

快速开始

\n
    \n
  1. 安装软件
  2. \n
  3. 配置参数
  4. \n
  5. 启动应用
  6. \n
\n

功能特性

\n \n\n""") + with open(os.path.join(dist_dir, "config", "app_config.yaml"), 'w', encoding='utf-8') as f: + f.write("""# MyApp 配置文件模板\n# 复制此文件为 config.yaml 并修改相应配置\n\napplication:\n name: "MyApp"\n version: "2.0.0"\n debug: false\n log_level: "INFO"\n\ndatabase:\n host: "localhost"\n port: 5432\n name: "myapp_db"\n username: "db_user"\n # password: "请在此设置密码"\n\nserver:\n host: "0.0.0.0"\n port: 8080\n ssl_enabled: false\n\nplugins:\n enabled: true\n directory: "./plugins" """) + with open(os.path.join(dist_dir, "examples", "basic_usage.py"), 'w', encoding='utf-8') as f: + f.write("""#!/usr/bin/env python3\n\"\"\"\nMyApp 基础使用示例\n\"\"\"\n\nimport sys\nimport os\n\n# 添加 lib 目录到路径\nsys.path.append(os.path.join(os.path.dirname(__file__), '../lib'))\n\nfrom core import ApplicationCore\n\ndef main():\n # 基础配置\n config = {\n 'application': {\n 'name': '示例应用',\n 'debug': True\n },\n 'database': {\n 'host': 'localhost',\n 'port': 5432,\n 'name': 'example_db'\n }\n }\n \n # 创建应用实例\n app = ApplicationCore(config)\n \n # 初始化应用\n app.initialize()\n \n # 运行应用\n app.run()\n\nif __name__ == "__main__":\n main()""") + return dist_dir + def test_software_distribution_package(self): + try: + import py7zr + except ImportError: + self.skipTest("py7zr not available") + dist_dir = self.create_software_distribution_package() + sevenzip_path = os.path.join(self.temp_dir, "myapp_distribution.7z") + try: + with py7zr.SevenZipFile(sevenzip_path, 'w') as archive: + archive.writeall(dist_dir, 'myapp_v2.0.0') + except Exception as e: + self.skipTest(f"Failed to create distribution 7Z: {e}") + documents = self.reader._load_data(sevenzip_path) + self.assertGreater(len(documents), 5) + install_docs = [d for d in documents if 'INSTALL' in d.metadata.get('file_name', '')] + self.assertGreater(len(install_docs), 0) + config_docs = [d for d in documents if 'config' in d.metadata.get('file_name', '').lower()] + self.assertGreater(len(config_docs), 0) + core_docs = [d for d in documents if d.metadata.get('file_name') == 'core.py'] + self.assertGreater(len(core_docs), 0) + def test_documentation_archive_with_metadata(self): + try: + import py7zr + except ImportError: + self.skipTest("py7zr not available") + doc_dir = os.path.join(self.temp_dir, "documentation") + os.makedirs(doc_dir, exist_ok=True) + os.makedirs(os.path.join(doc_dir, "api"), exist_ok=True) + os.makedirs(os.path.join(doc_dir, "tutorials"), exist_ok=True) + os.makedirs(os.path.join(doc_dir, "guides"), exist_ok=True) + with open(os.path.join(doc_dir, "README.md"), 'w', encoding='utf-8') as f: + f.write("""# 项目文档\n这是 SevenZipReader 测试用的文档存档。\n包含 API 文档、教程和指南。""") + with open(os.path.join(doc_dir, "api", "rest_api.md"), 'w', encoding='utf-8') as f: + f.write("""# REST API 参考\n## 认证端点\n### POST /auth/login\n用户登录\n### GET /auth/logout\n用户登出\n## 数据端点\n### GET /api/data\n获取数据列表\n### POST /api/data\n创建新数据""") + with open(os.path.join(doc_dir, "tutorials", "quick_start.md"), 'w', encoding='utf-8') as f: + f.write("""# 快速入门教程\n## 第一步:环境准备\n安装 Python 和必要依赖。\n## 第二步:配置应用\n修改配置文件。\n## 第三步:运行测试\n执行测试命令验证安装。""") + with open(os.path.join(doc_dir, "guides", "development.md"), 'w', encoding='utf-8') as f: + f.write("""# 开发指南\n## 代码规范\n- 遵循 PEP 8\n- 编写单元测试\n- 使用类型注解\n## 提交规范\n- 清晰的提交信息\n- 关联问题编号\n- 通过所有测试""") + sevenzip_path = os.path.join(self.temp_dir, "documentation.7z") + try: + with py7zr.SevenZipFile(sevenzip_path, 'w') as archive: + archive.writeall(doc_dir, '') + except Exception as e: + self.skipTest(f"Failed to create documentation 7Z: {e}") + project_metadata = {"project": "SevenZipReader", "category": "documentation", "version": "2.0.0", "author": "Test Team", "description": "测试文档存档"} + documents = self.reader._load_data(sevenzip_path, ext_info=project_metadata) + self.assertGreater(len(documents), 3) + for doc in documents: + self.assertEqual(doc.metadata.get('project'), 'SevenZipReader') + self.assertEqual(doc.metadata.get('category'), 'documentation') + md_docs = [d for d in documents if d.metadata.get('file_suffix') == '.md'] + self.assertGreaterEqual(len(md_docs), 3) + +class TestSevenZipReaderEdgeCases(unittest.TestCase): + """SevenZipReader 边界情况测试""" + def setUp(self): + self.reader = SevenZipReader() + self.temp_dir = tempfile.mkdtemp() + def tearDown(self): + if os.path.exists(self.temp_dir): + shutil.rmtree(self.temp_dir) + def test_empty_7z_archive(self): + try: + import py7zr + except ImportError: + self.skipTest("py7zr not available") + empty_7z_path = os.path.join(self.temp_dir, "empty.7z") + try: + with py7zr.SevenZipFile(empty_7z_path, 'w') as archive: + pass + except Exception as e: + self.skipTest(f"Failed to create empty 7Z: {e}") + documents = self.reader._load_data(empty_7z_path) + self.assertIsInstance(documents, list) + self.assertEqual(len(documents), 0) + def test_special_characters_in_filenames(self): + try: + import py7zr + except ImportError: + self.skipTest("py7zr not available") + special_files = { + "测试文件_中文.txt": "包含中文文件名的测试文件", + "file with spaces.txt": "包含空格的文件名", + "file-with-dashes.txt": "包含连字符的文件名", + "file.with.dots.txt": "包含点的文件名", + "mixed_case_FILE.TXT": "混合大小写的文件名", + "unicode_测试_文件🎉.txt": "包含Unicode表情的文件名", + } + files_dir = os.path.join(self.temp_dir, "special_names") + os.makedirs(files_dir, exist_ok=True) + for filename, content in special_files.items(): + with open(os.path.join(files_dir, filename), 'w', encoding='utf-8') as f: + f.write(content) + sevenzip_path = os.path.join(self.temp_dir, "special_names.7z") + try: + with py7zr.SevenZipFile(sevenzip_path, 'w') as archive: + archive.writeall(files_dir, '') + except Exception as e: + self.skipTest(f"Failed to create special names 7Z: {e}") + documents = self.reader._load_data(sevenzip_path) + self.assertEqual(len(documents), len(special_files)) + extracted_names = [doc.metadata.get('file_name') for doc in documents] + for original_name in special_files.keys(): + self.assertIn(original_name, extracted_names) + def test_deep_directory_structure(self): + try: + import py7zr + except ImportError: + self.skipTest("py7zr not available") + deep_dir = self.temp_dir + depth = 8 + for i in range(depth): + deep_dir = os.path.join(deep_dir, f"level_{i:02d}") + os.makedirs(deep_dir, exist_ok=True) + deep_file = os.path.join(deep_dir, "deep_nested_file.txt") + with open(deep_file, 'w', encoding='utf-8') as f: + f.write("这是位于深层嵌套目录中的文件。") + mid_file = os.path.join(self.temp_dir, "level_00", "level_01", "mid_level_file.yaml") + os.makedirs(os.path.dirname(mid_file), exist_ok=True) + with open(mid_file, 'w', encoding='utf-8') as f: + f.write("config:\n level: mid\n description: 中间层文件") + sevenzip_path = os.path.join(self.temp_dir, "deep_structure.7z") + try: + with py7zr.SevenZipFile(sevenzip_path, 'w') as archive: + archive.writeall(self.temp_dir, '') + except Exception as e: + self.skipTest(f"Failed to create deep structure 7Z: {e}") + documents = self.reader._load_data(sevenzip_path) + self.assertGreater(len(documents), 0) + deep_files = [doc for doc in documents if 'deep_nested_file' in doc.metadata.get('file_name', '')] + self.assertEqual(len(deep_files), 1) + deep_doc = deep_files[0] + self.assertIn('level_07', deep_doc.metadata.get('archive_path', '')) + def test_path_handling_with_different_types(self): + try: + import py7zr + except ImportError: + self.skipTest("py7zr not available") + test_file = os.path.join(self.temp_dir, "path_test.txt") + with open(test_file, 'w', encoding='utf-8') as f: + f.write("路径处理测试文件") + sevenzip_path = os.path.join(self.temp_dir, "path_test.7z") + try: + with py7zr.SevenZipFile(sevenzip_path, 'w') as archive: + archive.write(test_file, 'path_test.txt') + except Exception as e: + self.skipTest(f"Failed to create path test 7Z: {e}") + docs_str = self.reader._load_data(sevenzip_path) + docs_path = self.reader._load_data(Path(sevenzip_path)) + self.assertEqual(len(docs_str), len(docs_path)) + if docs_str and docs_path: + self.assertEqual(docs_str[0].text, docs_path[0].text) + self.assertEqual(docs_str[0].metadata['file_name'], docs_path[0].metadata['file_name']) + def test_mixed_text_encodings_and_formats(self): + try: + import py7zr + except ImportError: + self.skipTest("py7zr not available") + files_dir = os.path.join(self.temp_dir, "mixed_encodings") + os.makedirs(files_dir, exist_ok=True) + with open(os.path.join(files_dir, "utf8_bom.txt"), 'w', encoding='utf-8-sig') as f: + f.write("UTF-8 with BOM: 测试文本") + with open(os.path.join(files_dir, "utf8_nobom.txt"), 'w', encoding='utf-8') as f: + f.write("UTF-8 without BOM: 测试文本") + with open(os.path.join(files_dir, "unicode_chars.txt"), 'w', encoding='utf-8') as f: + f.write("Unicode 测试: 🌟🎉🚀 中文测试 ©®™") + sevenzip_path = os.path.join(self.temp_dir, "mixed_encodings.7z") + try: + with py7zr.SevenZipFile(sevenzip_path, 'w') as archive: + archive.writeall(files_dir, '') + except Exception as e: + self.skipTest(f"Failed to create mixed encodings 7Z: {e}") + documents = self.reader._load_data(sevenzip_path) + self.assertEqual(len(documents), 3) + for doc in documents: + self.assertIsInstance(doc.text, str) + self.assertGreater(len(doc.text), 0) + self.assertIn('测试', doc.text) + +class TestSevenZipReaderPerformance(unittest.TestCase): + """SevenZipReader 性能测试""" + def setUp(self): + self.reader = SevenZipReader() + self.temp_dir = tempfile.mkdtemp() + def tearDown(self): + if os.path.exists(self.temp_dir): + shutil.rmtree(self.temp_dir) + def test_large_number_of_small_files(self): + try: + import py7zr + except ImportError: + self.skipTest("py7zr not available") + files_dir = os.path.join(self.temp_dir, "many_small_files") + os.makedirs(files_dir, exist_ok=True) + file_count = 100 + for i in range(file_count): + filepath = os.path.join(files_dir, f"small_file_{i:03d}.txt") + with open(filepath, 'w', encoding='utf-8') as f: + f.write(f"这是小文件 {i} 的内容,用于性能测试。") + sevenzip_path = os.path.join(self.temp_dir, "many_small_files.7z") + try: + with py7zr.SevenZipFile(sevenzip_path, 'w') as archive: + archive.writeall(files_dir, '') + except Exception as e: + self.skipTest(f"Failed to create many small files 7Z: {e}") + start_time = time.time() + documents = self.reader._load_data(sevenzip_path, max_files=file_count) + elapsed_time = time.time() - start_time + self.assertLessEqual(len(documents), file_count) + self.assertLess(elapsed_time, 30, f"处理 {file_count} 个文件耗时 {elapsed_time:.2f} 秒,超过性能要求") + print(f"\n性能测试: 处理 {len(documents)}/{file_count} 个文件耗时 {elapsed_time:.2f} 秒") + def test_reader_cache_efficiency(self): + try: + import py7zr + except ImportError: + self.skipTest("py7zr not available") + files_dir = os.path.join(self.temp_dir, "cache_test") + os.makedirs(files_dir, exist_ok=True) + file_types = {'script.py': "print('Python script')", 'data.json': '{"test": "data"}', 'config.yaml': "app:\n name: test", 'document.txt': "Text document content", 'readme.md': "# Markdown Document"} + for filename, content in file_types.items(): + with open(os.path.join(files_dir, filename), 'w', encoding='utf-8') as f: + f.write(content) + sevenzip_path = os.path.join(self.temp_dir, "cache_test.7z") + try: + with py7zr.SevenZipFile(sevenzip_path, 'w') as archive: + archive.writeall(files_dir, '') + except Exception as e: + self.skipTest(f"Failed to create cache test 7Z: {e}") + self.reader._reader_cache.clear() + documents1 = self.reader._load_data(sevenzip_path) + cache_size_after_first = len(self.reader._reader_cache) + self.assertGreater(cache_size_after_first, 0) + documents2 = self.reader._load_data(sevenzip_path) + cache_size_after_second = len(self.reader._reader_cache) + self.assertEqual(cache_size_after_first, cache_size_after_second) + self.assertEqual(len(documents1), len(documents2)) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file