mirror of
https://github.com/agentuniverse-ai/agentUniverse.git
synced 2026-02-09 01:59:19 +08:00
Merge pull request #462 from NJX-njx/master
feat: 扩展ReaderManager以支持更多文件类型的读取
This commit is contained in:
@@ -0,0 +1,65 @@
|
||||
# !/usr/bin/env python3
|
||||
# -*- coding:utf-8 -*-
|
||||
|
||||
# @Time : 2025/9/29
|
||||
# @FileName: confluence_reader.py
|
||||
from typing import List, Optional, Dict
|
||||
|
||||
from agentuniverse.agent.action.knowledge.reader.reader import Reader
|
||||
from agentuniverse.agent.action.knowledge.store.document import Document
|
||||
|
||||
|
||||
class ConfluenceReader(Reader):
|
||||
"""Reader for Atlassian Confluence pages.
|
||||
|
||||
Requires:
|
||||
pip install atlassian-python-api
|
||||
Credentials:
|
||||
site_url, username, token must be provided via ext_info or env.
|
||||
"""
|
||||
|
||||
def _load_data(self, page_id: str, ext_info: Optional[Dict] = None) -> List[Document]:
|
||||
print(f"debugging: ConfluenceReader start load page_id={page_id}")
|
||||
if not page_id:
|
||||
raise ValueError("ConfluenceReader requires page_id")
|
||||
|
||||
site_url, username, token = self._resolve_cred(ext_info)
|
||||
try:
|
||||
from atlassian import Confluence # type: ignore
|
||||
except Exception:
|
||||
raise ImportError("Install atlassian-python-api: `pip install atlassian-python-api`")
|
||||
|
||||
conf = Confluence(url=site_url, username=username, password=token, cloud=True)
|
||||
page = conf.get_page_by_id(page_id, expand="body.view,version,metadata.labels")
|
||||
html = page.get("body", {}).get("view", {}).get("value", "")
|
||||
|
||||
text = self._html_to_text(html)
|
||||
metadata: Dict = {
|
||||
"source": "confluence",
|
||||
"page_id": page_id,
|
||||
"title": page.get("title"),
|
||||
"version": page.get("version", {}).get("number")
|
||||
}
|
||||
if ext_info:
|
||||
metadata.update(ext_info)
|
||||
return [Document(text=text, metadata=metadata)]
|
||||
|
||||
def _resolve_cred(self, ext_info: Optional[Dict]) -> (str, str, str):
|
||||
import os
|
||||
site_url = (ext_info or {}).get("site_url") or os.environ.get("CONFLUENCE_URL")
|
||||
username = (ext_info or {}).get("username") or os.environ.get("CONFLUENCE_USERNAME")
|
||||
token = (ext_info or {}).get("token") or os.environ.get("CONFLUENCE_TOKEN")
|
||||
if not site_url or not username or not token:
|
||||
raise EnvironmentError("Confluence credentials required: site_url, username, token")
|
||||
return site_url, username, token
|
||||
|
||||
def _html_to_text(self, html: str) -> str:
|
||||
try:
|
||||
from bs4 import BeautifulSoup # type: ignore
|
||||
except Exception:
|
||||
raise ImportError("Install beautifulsoup4 and lxml for ConfluenceReader")
|
||||
soup = BeautifulSoup(html, "lxml")
|
||||
for tag in soup(["script", "style", "noscript"]):
|
||||
tag.extract()
|
||||
text = soup.get_text("\n")
|
||||
return "\n".join([line.strip() for line in text.splitlines() if line.strip()])
|
||||
@@ -0,0 +1,6 @@
|
||||
name: 'default_confluence_reader'
|
||||
description: 'default Confluence reader'
|
||||
metadata:
|
||||
type: 'READER'
|
||||
module: 'agentuniverse.agent.action.knowledge.reader.cloud.confluence_reader'
|
||||
class: 'ConfluenceReader'
|
||||
@@ -0,0 +1,72 @@
|
||||
# !/usr/bin/env python3
|
||||
# -*- coding:utf-8 -*-
|
||||
|
||||
# @Time : 2025/9/29
|
||||
# @FileName: google_docs_reader.py
|
||||
from typing import List, Optional, Dict
|
||||
|
||||
from agentuniverse.agent.action.knowledge.reader.reader import Reader
|
||||
from agentuniverse.agent.action.knowledge.store.document import Document
|
||||
|
||||
|
||||
class GoogleDocsReader(Reader):
|
||||
"""Reader for Google Docs via Google Drive export.
|
||||
|
||||
Requires:
|
||||
pip install google-api-python-client google-auth google-auth-oauthlib
|
||||
Credentials:
|
||||
Use a service account JSON or OAuth credentials; pass via env or ext_info.
|
||||
"""
|
||||
|
||||
def _load_data(self, doc_id: str, ext_info: Optional[Dict] = None) -> List[Document]:
|
||||
print(f"debugging: GoogleDocsReader start load doc_id={doc_id}")
|
||||
if not doc_id:
|
||||
raise ValueError("GoogleDocsReader requires doc_id")
|
||||
|
||||
service = self._build_drive_service(ext_info)
|
||||
html = self._export_html(service, doc_id)
|
||||
text = self._html_to_text(html)
|
||||
|
||||
metadata: Dict = {"source": "google_docs", "doc_id": doc_id}
|
||||
if ext_info:
|
||||
metadata.update(ext_info)
|
||||
return [Document(text=text, metadata=metadata)]
|
||||
|
||||
def _build_drive_service(self, ext_info: Optional[Dict]):
|
||||
try:
|
||||
from google.oauth2.service_account import Credentials # type: ignore
|
||||
from googleapiclient.discovery import build # type: ignore
|
||||
except Exception:
|
||||
raise ImportError("Install Google API deps: `pip install google-api-python-client google-auth google-auth-oauthlib`")
|
||||
|
||||
import os
|
||||
scopes = ['https://www.googleapis.com/auth/drive.readonly']
|
||||
sa_path = (ext_info or {}).get('GOOGLE_SERVICE_ACCOUNT_JSON') or os.environ.get('GOOGLE_SERVICE_ACCOUNT_JSON')
|
||||
if not sa_path:
|
||||
raise EnvironmentError("Provide GOOGLE_SERVICE_ACCOUNT_JSON path for service account usage")
|
||||
creds = Credentials.from_service_account_file(sa_path, scopes=scopes)
|
||||
return build('drive', 'v3', credentials=creds)
|
||||
|
||||
def _export_html(self, drive, file_id: str) -> str:
|
||||
from googleapiclient.http import MediaIoBaseDownload # type: ignore
|
||||
import io
|
||||
print("debugging: GoogleDocsReader exporting as HTML")
|
||||
request = drive.files().export(fileId=file_id, mimeType='text/html')
|
||||
fh = io.BytesIO()
|
||||
downloader = MediaIoBaseDownload(fh, request)
|
||||
done = False
|
||||
while done is False:
|
||||
status, done = downloader.next_chunk()
|
||||
html = fh.getvalue().decode('utf-8', errors='ignore')
|
||||
return html
|
||||
|
||||
def _html_to_text(self, html: str) -> str:
|
||||
try:
|
||||
from bs4 import BeautifulSoup # type: ignore
|
||||
except Exception:
|
||||
raise ImportError("Install beautifulsoup4 and lxml for GoogleDocsReader")
|
||||
soup = BeautifulSoup(html, "lxml")
|
||||
for tag in soup(["script", "style", "noscript"]):
|
||||
tag.extract()
|
||||
text = soup.get_text("\n")
|
||||
return "\n".join([line.strip() for line in text.splitlines() if line.strip()])
|
||||
@@ -0,0 +1,6 @@
|
||||
name: 'default_google_docs_reader'
|
||||
description: 'default Google Docs reader'
|
||||
metadata:
|
||||
type: 'READER'
|
||||
module: 'agentuniverse.agent.action.knowledge.reader.cloud.google_docs_reader'
|
||||
class: 'GoogleDocsReader'
|
||||
@@ -0,0 +1,97 @@
|
||||
# !/usr/bin/env python3
|
||||
# -*- coding:utf-8 -*-
|
||||
|
||||
# @Time : 2025/9/29
|
||||
# @FileName: notion_reader.py
|
||||
from typing import List, Optional, Dict
|
||||
|
||||
from agentuniverse.agent.action.knowledge.reader.reader import Reader
|
||||
from agentuniverse.agent.action.knowledge.store.document import Document
|
||||
|
||||
|
||||
class NotionReader(Reader):
|
||||
"""Reader for Notion pages/databases via Notion API.
|
||||
|
||||
Requires:
|
||||
pip install notion-client
|
||||
Environment:
|
||||
NOTION_TOKEN must be provided (or pass via ext_info)
|
||||
"""
|
||||
|
||||
def _load_data(self, page_or_db_id: str, ext_info: Optional[Dict] = None) -> List[Document]:
|
||||
print(f"debugging: NotionReader start load id={page_or_db_id}")
|
||||
if not page_or_db_id:
|
||||
raise ValueError("NotionReader requires a Notion page or database id")
|
||||
|
||||
token = None
|
||||
if ext_info:
|
||||
token = ext_info.get("NOTION_TOKEN") or ext_info.get("notion_token")
|
||||
if not token:
|
||||
import os
|
||||
token = os.environ.get("NOTION_TOKEN")
|
||||
if not token:
|
||||
raise EnvironmentError("NOTION_TOKEN is required for NotionReader")
|
||||
|
||||
try:
|
||||
from notion_client import Client # type: ignore
|
||||
except Exception:
|
||||
raise ImportError("Install notion-client: `pip install notion-client`")
|
||||
|
||||
client = Client(auth=token)
|
||||
text_blocks: List[str] = []
|
||||
metadata: Dict = {"source": "notion", "id": page_or_db_id}
|
||||
|
||||
# Try as page
|
||||
try:
|
||||
page = client.pages.retrieve(page_id=page_or_db_id)
|
||||
metadata["type"] = "page"
|
||||
text_blocks.extend(self._export_page(client, page_or_db_id))
|
||||
except Exception as e_page:
|
||||
print(f"debugging: NotionReader page retrieve failed: {e_page}")
|
||||
# Try as database
|
||||
try:
|
||||
metadata["type"] = "database"
|
||||
for row in client.databases.query(database_id=page_or_db_id).get("results", []):
|
||||
row_id = row.get("id")
|
||||
text_blocks.extend(self._export_page(client, row_id))
|
||||
except Exception as e_db:
|
||||
raise RuntimeError(f"Failed to read Notion id={page_or_db_id}: {e_db}")
|
||||
|
||||
text = "\n\n".join([b for b in text_blocks if b and b.strip()])
|
||||
if ext_info:
|
||||
metadata.update(ext_info)
|
||||
return [Document(text=text, metadata=metadata)]
|
||||
|
||||
def _export_page(self, client, page_id: str) -> List[str]:
|
||||
blocks: List[str] = []
|
||||
cursor = None
|
||||
while True:
|
||||
children = client.blocks.children.list(block_id=page_id, start_cursor=cursor)
|
||||
for blk in children.get("results", []):
|
||||
txt = self._block_to_text(blk)
|
||||
if txt:
|
||||
blocks.append(txt)
|
||||
if not children.get("has_more"):
|
||||
break
|
||||
cursor = children.get("next_cursor")
|
||||
return blocks
|
||||
|
||||
def _block_to_text(self, block: Dict) -> str:
|
||||
t = block.get("type")
|
||||
data = block.get(t, {}) if t else {}
|
||||
def rich_text_to_str(items: List[Dict]) -> str:
|
||||
parts: List[str] = []
|
||||
for it in items or []:
|
||||
plain = it.get("plain_text") or ""
|
||||
if plain:
|
||||
parts.append(plain)
|
||||
return "".join(parts)
|
||||
if t in ("paragraph", "heading_1", "heading_2", "heading_3", "quote", "callout", "bulleted_list_item", "numbered_list_item", "to_do", "toggle"):
|
||||
return rich_text_to_str(data.get("rich_text", []))
|
||||
if t == "code":
|
||||
return rich_text_to_str(data.get("rich_text", []))
|
||||
if t == "table":
|
||||
return "[table omitted]"
|
||||
if t == "image":
|
||||
return "[image]"
|
||||
return ""
|
||||
@@ -0,0 +1,6 @@
|
||||
name: 'default_notion_reader'
|
||||
description: 'default Notion reader'
|
||||
metadata:
|
||||
type: 'READER'
|
||||
module: 'agentuniverse.agent.action.knowledge.reader.cloud.notion_reader'
|
||||
class: 'NotionReader'
|
||||
@@ -0,0 +1,79 @@
|
||||
# !/usr/bin/env python3
|
||||
# -*- coding:utf-8 -*-
|
||||
|
||||
# @Time : 2025/9/29
|
||||
# @FileName: image_ocr_reader.py
|
||||
from typing import List, Optional, Dict, Union
|
||||
from pathlib import Path
|
||||
|
||||
from agentuniverse.agent.action.knowledge.reader.reader import Reader
|
||||
from agentuniverse.agent.action.knowledge.store.document import Document
|
||||
|
||||
|
||||
class ImageOCRReader(Reader):
|
||||
"""OCR reader for image files.
|
||||
|
||||
Preferred engine: PaddleOCR. Fallback: Tesseract or easyocr.
|
||||
Install tips:
|
||||
- pip install paddleocr paddlepaddle (or CPU/GPU variant)
|
||||
- or pip install pytesseract pillow
|
||||
- or pip install easyocr
|
||||
"""
|
||||
|
||||
def _load_data(self, file: Union[str, Path], ext_info: Optional[Dict] = None) -> List[Document]:
|
||||
print(f"debugging: ImageOCRReader start load file={file}")
|
||||
if isinstance(file, str):
|
||||
file = Path(file)
|
||||
if not isinstance(file, Path) or not file.exists():
|
||||
raise FileNotFoundError(f"ImageOCRReader file not found: {file}")
|
||||
|
||||
text, engine = self._ocr(file)
|
||||
print(f"debugging: ImageOCRReader extracted by {engine}, length={len(text)}")
|
||||
|
||||
metadata: Dict = {"source": "image", "file_name": file.name, "engine": engine}
|
||||
if ext_info:
|
||||
metadata.update(ext_info)
|
||||
return [Document(text=text, metadata=metadata)]
|
||||
|
||||
def _ocr(self, file: Path) -> (str, str):
|
||||
# Try PaddleOCR
|
||||
try:
|
||||
from paddleocr import PaddleOCR # type: ignore
|
||||
print("debugging: ImageOCRReader using PaddleOCR")
|
||||
ocr = PaddleOCR(use_angle_cls=True, lang='ch')
|
||||
result = ocr.ocr(str(file), cls=True)
|
||||
lines: List[str] = []
|
||||
for page in result:
|
||||
for line in page:
|
||||
txt = line[1][0]
|
||||
if txt:
|
||||
lines.append(txt)
|
||||
return "\n".join(lines), "paddleocr"
|
||||
except Exception as e_paddle:
|
||||
print(f"debugging: ImageOCRReader PaddleOCR failed: {e_paddle}")
|
||||
|
||||
# Fallback to pytesseract
|
||||
try:
|
||||
from PIL import Image # type: ignore
|
||||
import pytesseract # type: ignore
|
||||
print("debugging: ImageOCRReader using pytesseract")
|
||||
img = Image.open(file)
|
||||
text = pytesseract.image_to_string(img, lang='chi_sim+eng')
|
||||
return text, "pytesseract"
|
||||
except Exception as e_tess:
|
||||
print(f"debugging: ImageOCRReader pytesseract failed: {e_tess}")
|
||||
|
||||
# Fallback to easyocr
|
||||
try:
|
||||
import easyocr # type: ignore
|
||||
print("debugging: ImageOCRReader using easyocr")
|
||||
reader = easyocr.Reader(['ch_sim', 'en'])
|
||||
result = reader.readtext(str(file), detail=0)
|
||||
return "\n".join(result), "easyocr"
|
||||
except Exception as e_easy:
|
||||
raise ImportError(
|
||||
"No OCR engine available. Install one of: "
|
||||
"`pip install paddleocr paddlepaddle` or "
|
||||
"`pip install pytesseract pillow` or "
|
||||
"`pip install easyocr`"
|
||||
)
|
||||
@@ -0,0 +1,6 @@
|
||||
name: 'default_image_ocr_reader'
|
||||
description: 'default image OCR reader'
|
||||
metadata:
|
||||
type: 'READER'
|
||||
module: 'agentuniverse.agent.action.knowledge.reader.image.image_ocr_reader'
|
||||
class: 'ImageOCRReader'
|
||||
@@ -0,0 +1,114 @@
|
||||
# !/usr/bin/env python3
|
||||
# -*- coding:utf-8 -*-
|
||||
|
||||
# @Time : 2025/9/29
|
||||
# @FileName: scanned_pdf_ocr_reader.py
|
||||
from typing import List, Optional, Dict, Union
|
||||
from pathlib import Path
|
||||
|
||||
from agentuniverse.agent.action.knowledge.reader.reader import Reader
|
||||
from agentuniverse.agent.action.knowledge.store.document import Document
|
||||
|
||||
|
||||
class ScannedPdfOCRReader(Reader):
|
||||
"""Reader for scanned PDFs using page-level OCR.
|
||||
|
||||
Strategy:
|
||||
1) Try to extract text with pypdf. If empty/None, fallback to OCR.
|
||||
2) OCR via PaddleOCR -> pytesseract -> easyocr.
|
||||
"""
|
||||
|
||||
def _load_data(self, file: Union[str, Path], ext_info: Optional[Dict] = None) -> List[Document]:
|
||||
print(f"debugging: ScannedPdfOCRReader start load file={file}")
|
||||
if isinstance(file, str):
|
||||
file = Path(file)
|
||||
if not isinstance(file, Path) or not file.exists():
|
||||
raise FileNotFoundError(f"ScannedPdfOCRReader file not found: {file}")
|
||||
|
||||
texts: List[str] = []
|
||||
engines: List[str] = []
|
||||
try:
|
||||
import pypdf # type: ignore
|
||||
print("debugging: ScannedPdfOCRReader using pypdf first")
|
||||
with open(file, "rb") as fp:
|
||||
pdf = pypdf.PdfReader(fp)
|
||||
for i, page in enumerate(pdf.pages):
|
||||
txt = page.extract_text() or ""
|
||||
if txt.strip():
|
||||
texts.append(txt)
|
||||
engines.append("pypdf")
|
||||
else:
|
||||
ocr_txt, ocr_engine = self._ocr_pdf_page(file, i)
|
||||
texts.append(ocr_txt)
|
||||
engines.append(ocr_engine)
|
||||
except Exception as e:
|
||||
print(f"debugging: ScannedPdfOCRReader pypdf failed: {e}")
|
||||
# If pypdf fails, OCR every page
|
||||
num_pages = self._count_pdf_pages(file)
|
||||
for i in range(num_pages):
|
||||
ocr_txt, ocr_engine = self._ocr_pdf_page(file, i)
|
||||
texts.append(ocr_txt)
|
||||
engines.append(ocr_engine)
|
||||
|
||||
text_all = "\n\n".join(texts)
|
||||
engine_summary = ",".join(sorted(set(engines))) if engines else "unknown"
|
||||
metadata: Dict = {"source": "pdf", "file_name": file.name, "engine": engine_summary}
|
||||
if ext_info:
|
||||
metadata.update(ext_info)
|
||||
return [Document(text=text_all, metadata=metadata)]
|
||||
|
||||
def _count_pdf_pages(self, file: Path) -> int:
|
||||
try:
|
||||
import pypdf # type: ignore
|
||||
with open(file, "rb") as fp:
|
||||
pdf = pypdf.PdfReader(fp)
|
||||
return len(pdf.pages)
|
||||
except Exception:
|
||||
return 0
|
||||
|
||||
def _ocr_pdf_page(self, file: Path, page_index: int) -> (str, str):
|
||||
# Convert PDF page to image
|
||||
try:
|
||||
from pdf2image import convert_from_path # type: ignore
|
||||
except Exception:
|
||||
raise ImportError("pdf2image is required: `pip install pdf2image`. Also install poppler.")
|
||||
|
||||
print(f"debugging: ScannedPdfOCRReader converting page {page_index} to image")
|
||||
images = convert_from_path(str(file), first_page=page_index + 1, last_page=page_index + 1)
|
||||
if not images:
|
||||
return "", "none"
|
||||
|
||||
# Try PaddleOCR
|
||||
try:
|
||||
from paddleocr import PaddleOCR # type: ignore
|
||||
print("debugging: ScannedPdfOCRReader using PaddleOCR")
|
||||
ocr = PaddleOCR(use_angle_cls=True, lang='ch')
|
||||
result = ocr.ocr(images[0], cls=True)
|
||||
lines = []
|
||||
for page in result:
|
||||
for line in page:
|
||||
txt = line[1][0]
|
||||
if txt:
|
||||
lines.append(txt)
|
||||
return "\n".join(lines), "paddleocr"
|
||||
except Exception as e_paddle:
|
||||
print(f"debugging: ScannedPdfOCRReader PaddleOCR failed: {e_paddle}")
|
||||
|
||||
# Fallback to pytesseract
|
||||
try:
|
||||
import pytesseract # type: ignore
|
||||
print("debugging: ScannedPdfOCRReader using pytesseract")
|
||||
text = pytesseract.image_to_string(images[0], lang='chi_sim+eng')
|
||||
return text, "pytesseract"
|
||||
except Exception as e_tess:
|
||||
print(f"debugging: ScannedPdfOCRReader pytesseract failed: {e_tess}")
|
||||
|
||||
# Fallback to easyocr
|
||||
try:
|
||||
import easyocr # type: ignore
|
||||
print("debugging: ScannedPdfOCRReader using easyocr")
|
||||
reader = easyocr.Reader(['ch_sim', 'en'])
|
||||
result = reader.readtext(images[0], detail=0)
|
||||
return "\n".join(result), "easyocr"
|
||||
except Exception:
|
||||
return "", "unknown"
|
||||
@@ -0,0 +1,6 @@
|
||||
name: 'default_scanned_pdf_ocr_reader'
|
||||
description: 'default scanned PDF OCR reader'
|
||||
metadata:
|
||||
type: 'READER'
|
||||
module: 'agentuniverse.agent.action.knowledge.reader.image.scanned_pdf_ocr_reader'
|
||||
class: 'ScannedPdfOCRReader'
|
||||
@@ -20,7 +20,15 @@ class ReaderManager(ComponentManagerBase[Reader]):
|
||||
"pdf": "default_pdf_reader",
|
||||
"pptx": "default_pptx_reader",
|
||||
"docx": "default_docx_reader",
|
||||
"txt": "default_txt_reader"
|
||||
"txt": "default_txt_reader",
|
||||
# extended defaults for web & images
|
||||
"url": "default_web_page_reader",
|
||||
"png": "default_image_ocr_reader",
|
||||
"jpg": "default_image_ocr_reader",
|
||||
"jpeg": "default_image_ocr_reader",
|
||||
"bmp": "default_image_ocr_reader",
|
||||
"tiff": "default_image_ocr_reader",
|
||||
"webp": "default_image_ocr_reader",
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
"""Web readers package for agentUniverse."""
|
||||
@@ -0,0 +1,61 @@
|
||||
# !/usr/bin/env python3
|
||||
# -*- coding:utf-8 -*-
|
||||
|
||||
# @Time : 2025/9/29
|
||||
# @FileName: rendered_web_page_reader.py
|
||||
from typing import List, Optional, Dict
|
||||
|
||||
from agentuniverse.agent.action.knowledge.reader.reader import Reader
|
||||
from agentuniverse.agent.action.knowledge.store.document import Document
|
||||
|
||||
|
||||
class RenderedWebPageReader(Reader):
|
||||
"""Reader for dynamic web pages using Playwright rendering.
|
||||
|
||||
Requires:
|
||||
pip install playwright
|
||||
playwright install
|
||||
"""
|
||||
|
||||
def _load_data(self, url: str, ext_info: Optional[Dict] = None) -> List[Document]:
|
||||
print(f"debugging: RenderedWebPageReader start load url={url}")
|
||||
if not isinstance(url, str) or not url:
|
||||
raise ValueError("RenderedWebPageReader._load_data requires a non-empty url string")
|
||||
|
||||
html = self._render_and_get_html(url)
|
||||
print(f"debugging: RenderedWebPageReader rendered html length={len(html)}")
|
||||
|
||||
# Reuse extraction logic from WebPageReader by importing on demand
|
||||
from .web_page_reader import WebPageReader
|
||||
text, metadata_extra = WebPageReader()._extract_main_text(html, url)
|
||||
|
||||
metadata: Dict = {"source": "web", "url": url, "rendered": True}
|
||||
metadata.update(metadata_extra)
|
||||
if ext_info:
|
||||
metadata.update(ext_info)
|
||||
|
||||
return [Document(text=text, metadata=metadata)]
|
||||
|
||||
def _render_and_get_html(self, url: str) -> str:
|
||||
try:
|
||||
from playwright.sync_api import sync_playwright # type: ignore
|
||||
except Exception as e:
|
||||
raise ImportError(
|
||||
"playwright is required for RenderedWebPageReader. "
|
||||
"Install with `pip install playwright` and run `playwright install`"
|
||||
)
|
||||
|
||||
print("debugging: RenderedWebPageReader using playwright")
|
||||
with sync_playwright() as p:
|
||||
browser = p.chromium.launch(headless=True)
|
||||
try:
|
||||
context = browser.new_context()
|
||||
page = context.new_page()
|
||||
page.set_default_timeout(20000)
|
||||
page.set_default_navigation_timeout(20000)
|
||||
page.goto(url)
|
||||
page.wait_for_load_state("networkidle")
|
||||
html = page.content()
|
||||
return html
|
||||
finally:
|
||||
browser.close()
|
||||
@@ -0,0 +1,6 @@
|
||||
name: 'default_rendered_web_page_reader'
|
||||
description: 'default rendered web page reader (playwright)'
|
||||
metadata:
|
||||
type: 'READER'
|
||||
module: 'agentuniverse.agent.action.knowledge.reader.web.rendered_web_page_reader'
|
||||
class: 'RenderedWebPageReader'
|
||||
@@ -0,0 +1,108 @@
|
||||
# !/usr/bin/env python3
|
||||
# -*- coding:utf-8 -*-
|
||||
|
||||
# @Time : 2025/9/29
|
||||
# @FileName: web_page_reader.py
|
||||
from typing import List, Optional, Dict
|
||||
|
||||
from agentuniverse.agent.action.knowledge.reader.reader import Reader
|
||||
from agentuniverse.agent.action.knowledge.store.document import Document
|
||||
|
||||
|
||||
class WebPageReader(Reader):
|
||||
"""Reader for static web pages via HTTP fetching and boilerplate removal.
|
||||
|
||||
Usage:
|
||||
reader = WebPageReader()
|
||||
docs = reader.load_data(url="https://example.com/article")
|
||||
|
||||
Dependencies (optional but recommended):
|
||||
- trafilatura (preferred for article extraction)
|
||||
- readability-lxml (fallback for extraction)
|
||||
- beautifulsoup4 (last-resort plain text)
|
||||
- httpx or requests
|
||||
"""
|
||||
|
||||
def _load_data(self, url: str, ext_info: Optional[Dict] = None) -> List[Document]:
|
||||
print(f"debugging: WebPageReader start load url={url}")
|
||||
if not isinstance(url, str) or not url:
|
||||
raise ValueError("WebPageReader._load_data requires a non-empty url string")
|
||||
|
||||
html = self._fetch_html(url)
|
||||
print(f"debugging: WebPageReader fetched html length={len(html)}")
|
||||
|
||||
text, metadata_extra = self._extract_main_text(html, url)
|
||||
print(f"debugging: WebPageReader extracted text length={len(text)}")
|
||||
|
||||
metadata: Dict = {"source": "web", "url": url}
|
||||
metadata.update(metadata_extra)
|
||||
if ext_info:
|
||||
metadata.update(ext_info)
|
||||
|
||||
return [Document(text=text, metadata=metadata)]
|
||||
|
||||
def _fetch_html(self, url: str) -> str:
|
||||
try:
|
||||
import httpx # type: ignore
|
||||
print("debugging: WebPageReader using httpx")
|
||||
with httpx.Client(timeout=20.0, headers={
|
||||
"User-Agent": "agentUniverse/1.0 (+https://github.com/)",
|
||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||||
}) as client:
|
||||
resp = client.get(url, follow_redirects=True)
|
||||
resp.raise_for_status()
|
||||
return resp.text
|
||||
except Exception as e_httpx:
|
||||
print(f"debugging: WebPageReader httpx failed: {e_httpx}")
|
||||
try:
|
||||
import requests # type: ignore
|
||||
print("debugging: WebPageReader using requests fallback")
|
||||
resp = requests.get(url, timeout=20, headers={
|
||||
"User-Agent": "agentUniverse/1.0 (+https://github.com/)",
|
||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||||
})
|
||||
resp.raise_for_status()
|
||||
return resp.text
|
||||
except Exception as e_requests:
|
||||
raise RuntimeError(f"Failed to fetch url: {url}. httpx_error={e_httpx}, requests_error={e_requests}")
|
||||
|
||||
def _extract_main_text(self, html: str, url: str) -> (str, Dict):
|
||||
# Try trafilatura
|
||||
try:
|
||||
import trafilatura # type: ignore
|
||||
print("debugging: WebPageReader using trafilatura")
|
||||
extracted = trafilatura.extract(html, include_links=False, include_images=False)
|
||||
if extracted and extracted.strip():
|
||||
return extracted.strip(), {"extractor": "trafilatura"}
|
||||
except Exception as e_traf:
|
||||
print(f"debugging: WebPageReader trafilatura failed: {e_traf}")
|
||||
|
||||
# Fallback to readability
|
||||
try:
|
||||
from readability import Document as ReadabilityDocument # type: ignore
|
||||
from bs4 import BeautifulSoup # type: ignore
|
||||
print("debugging: WebPageReader using readability-lxml")
|
||||
article_html = ReadabilityDocument(html).summary(html_partial=True)
|
||||
soup = BeautifulSoup(article_html, "lxml")
|
||||
text = soup.get_text("\n")
|
||||
text = "\n".join([line.strip() for line in text.splitlines() if line.strip()])
|
||||
if text:
|
||||
return text, {"extractor": "readability"}
|
||||
except Exception as e_read:
|
||||
print(f"debugging: WebPageReader readability failed: {e_read}")
|
||||
|
||||
# Last resort: BeautifulSoup plain text
|
||||
try:
|
||||
from bs4 import BeautifulSoup # type: ignore
|
||||
print("debugging: WebPageReader using BeautifulSoup fallback")
|
||||
soup = BeautifulSoup(html, "lxml")
|
||||
for tag in soup(["script", "style", "noscript"]):
|
||||
tag.extract()
|
||||
text = soup.get_text("\n")
|
||||
text = "\n".join([line.strip() for line in text.splitlines() if line.strip()])
|
||||
return text, {"extractor": "bs4"}
|
||||
except Exception as e_bs:
|
||||
raise RuntimeError(
|
||||
"Install one of the extractors: `pip install trafilatura` or "
|
||||
"`pip install readability-lxml beautifulsoup4 lxml`"
|
||||
)
|
||||
@@ -0,0 +1,6 @@
|
||||
name: 'default_web_page_reader'
|
||||
description: 'default web page reader (static)'
|
||||
metadata:
|
||||
type: 'READER'
|
||||
module: 'agentuniverse.agent.action.knowledge.reader.web.web_page_reader'
|
||||
class: 'WebPageReader'
|
||||
Reference in New Issue
Block a user