refactor: mv post-retrieval processors to third party dir

This commit is contained in:
Jerry Z H
2025-10-31 13:41:28 +08:00
parent 1dd09f5119
commit c1191e34f7
13 changed files with 41 additions and 69 deletions

View File

@@ -1,67 +0,0 @@
# !/usr/bin/env python3
# -*- coding:utf-8 -*-
# @Time : 2025/10/13
# @Author : au-bot
# @FileName: merge_processor.py
from typing import List, Optional, Dict, Tuple
from agentuniverse.agent.action.knowledge.doc_processor.doc_processor import DocProcessor
from agentuniverse.agent.action.knowledge.store.document import Document
from agentuniverse.agent.action.knowledge.store.query import Query
from agentuniverse.base.config.component_configer.component_configer import ComponentConfiger
class MergeByMetadata(DocProcessor):
"""Merge documents that share the same metadata keys.
Documents are grouped by the tuple of values for `group_keys` inside
`Document.metadata`. Within each group, texts are concatenated with
`separator`. Optionally, keep only the best scored document's metadata
when `prefer_higher_score` is True.
"""
name: Optional[str] = "merge_by_metadata"
description: Optional[str] = "Merge docs by metadata keys"
group_keys: List[str] = []
separator: str = "\n\n"
prefer_higher_score: bool = True
def _make_group_key(self, metadata: Optional[Dict]) -> Tuple:
metadata = metadata or {}
return tuple(metadata.get(k) for k in self.group_keys)
def _process_docs(self, origin_docs: List[Document], query: Query | None = None) -> List[Document]:
if not origin_docs or not self.group_keys:
return origin_docs
grouped: Dict[Tuple, List[Document]] = {}
for doc in origin_docs:
key = self._make_group_key(doc.metadata)
grouped.setdefault(key, []).append(doc)
merged_docs: List[Document] = []
for _, docs in grouped.items():
if len(docs) == 1:
merged_docs.append(docs[0])
continue
combined_text = self.separator.join(d.text or "" for d in docs if d.text)
# choose metadata representative
rep = docs[0]
if self.prefer_higher_score:
rep = max(docs, key=lambda d: (d.metadata or {}).get("relevance_score", -1))
merged_docs.append(Document(text=combined_text, metadata=rep.metadata))
return merged_docs
def _initialize_by_component_configer(self, doc_processor_configer: ComponentConfiger) -> "DocProcessor":
super()._initialize_by_component_configer(doc_processor_configer)
if hasattr(doc_processor_configer, "group_keys"):
self.group_keys = list(doc_processor_configer.group_keys)
if hasattr(doc_processor_configer, "separator"):
self.separator = str(doc_processor_configer.separator)
if hasattr(doc_processor_configer, "prefer_higher_score"):
self.prefer_higher_score = bool(doc_processor_configer.prefer_higher_score)
return self

View File

@@ -1,7 +0,0 @@
name: 'merge_by_metadata'
description: 'Merge retrieved documents by metadata keys'
metadata:
type: 'DOC_PROCESSOR'
module: 'agentuniverse.agent.action.knowledge.doc_processor.merge_processor'
class: 'MergeByMetadata'

View File

@@ -1,59 +0,0 @@
# !/usr/bin/env python3
# -*- coding:utf-8 -*-
# @Time : 2025/10/13
# @Author : au-bot
# @FileName: score_filter_processor.py
from typing import List, Optional
from agentuniverse.agent.action.knowledge.doc_processor.doc_processor import DocProcessor
from agentuniverse.agent.action.knowledge.store.document import Document
from agentuniverse.agent.action.knowledge.store.query import Query
from agentuniverse.base.config.component_configer.component_configer import ComponentConfiger
class ScoreThresholdFilter(DocProcessor):
"""Filter documents by a relevance score threshold.
This post-retrieval processor expects each `Document.metadata` to optionally
contain a numeric field `relevance_score` (e.g., from a reranker). Documents
with a score lower than `min_score` will be dropped. If a document has no
score, it is kept only when `keep_no_score` is True.
"""
name: Optional[str] = "score_threshold_filter"
description: Optional[str] = "Filter docs by relevance score"
min_score: float = 0.0
keep_no_score: bool = True
top_k: Optional[int] = None
def _process_docs(self, origin_docs: List[Document], query: Query | None = None) -> List[Document]:
if not origin_docs:
return origin_docs
filtered: List[Document] = []
for doc in origin_docs:
metadata = doc.metadata or {}
score = metadata.get("relevance_score")
if score is None:
if self.keep_no_score:
filtered.append(doc)
else:
if score >= self.min_score:
filtered.append(doc)
if self.top_k is not None and self.top_k > 0:
filtered = filtered[: self.top_k]
return filtered
def _initialize_by_component_configer(self, doc_processor_configer: ComponentConfiger) -> "DocProcessor":
super()._initialize_by_component_configer(doc_processor_configer)
if hasattr(doc_processor_configer, "min_score"):
self.min_score = float(doc_processor_configer.min_score)
if hasattr(doc_processor_configer, "keep_no_score"):
self.keep_no_score = bool(doc_processor_configer.keep_no_score)
if hasattr(doc_processor_configer, "top_k"):
self.top_k = int(doc_processor_configer.top_k)
return self

View File

@@ -1,7 +0,0 @@
name: 'score_threshold_filter'
description: 'Filter documents by relevance score threshold'
metadata:
type: 'DOC_PROCESSOR'
module: 'agentuniverse.agent.action.knowledge.doc_processor.score_filter_processor'
class: 'ScoreThresholdFilter'

View File

@@ -1,76 +0,0 @@
# !/usr/bin/env python3
# -*- coding:utf-8 -*-
# @Time : 2025/10/13
# @Author : au-bot
# @FileName: summarize_processor.py
from typing import List, Optional
from agentuniverse.agent.action.knowledge.doc_processor.doc_processor import DocProcessor
from agentuniverse.agent.action.knowledge.store.document import Document
from agentuniverse.agent.action.knowledge.store.query import Query
from agentuniverse.base.config.component_configer.component_configer import ComponentConfiger
from agentuniverse.base.util.prompt_util import (
summarize_by_stuff,
summarize_by_map_reduce,
)
from agentuniverse.llm.llm_manager import LLMManager
from agentuniverse.prompt.prompt_manager import PromptManager
class SummarizeDocs(DocProcessor):
"""对召回文档进行摘要/总结合成。
支持两种模式:
- stuff: 直接将文本喂入 LLM 摘要,适合文档较短的情况;
- map_reduce: 先对分块做小结再合并,适合较长文本。
"""
name: Optional[str] = "summarize_docs"
description: Optional[str] = "Summarize retrieved documents"
llm: str = "__default_instance__"
mode: str = "stuff" # "stuff" | "map_reduce"
summary_prompt_version: str = "prompt_processor.summary_cn"
combine_prompt_version: str = "prompt_processor.combine_cn"
return_only_summary: bool = True
summary_metadata_key: str = "is_summary"
def _process_docs(self, origin_docs: List[Document], query: Query | None = None) -> List[Document]:
if not origin_docs:
return origin_docs
llm = LLMManager().get_instance_obj(self.llm)
texts = [d.text or "" for d in origin_docs if d.text]
summary_prompt = PromptManager().get_instance_obj(self.summary_prompt_version)
if self.mode == "map_reduce":
combine_prompt = PromptManager().get_instance_obj(self.combine_prompt_version)
summary_text = summarize_by_map_reduce(texts=texts, llm=llm, summary_prompt=summary_prompt,
combine_prompt=combine_prompt)
else:
summary_text = summarize_by_stuff(texts=texts, llm=llm, summary_prompt=summary_prompt)
summary_doc = Document(text=str(summary_text), metadata={self.summary_metadata_key: True})
if self.return_only_summary:
return [summary_doc]
return [summary_doc] + origin_docs
def _initialize_by_component_configer(self, doc_processor_configer: ComponentConfiger) -> "DocProcessor":
super()._initialize_by_component_configer(doc_processor_configer)
if hasattr(doc_processor_configer, "llm"):
self.llm = str(doc_processor_configer.llm)
if hasattr(doc_processor_configer, "mode"):
self.mode = str(doc_processor_configer.mode)
if hasattr(doc_processor_configer, "summary_prompt_version"):
self.summary_prompt_version = str(doc_processor_configer.summary_prompt_version)
if hasattr(doc_processor_configer, "combine_prompt_version"):
self.combine_prompt_version = str(doc_processor_configer.combine_prompt_version)
if hasattr(doc_processor_configer, "return_only_summary"):
self.return_only_summary = bool(doc_processor_configer.return_only_summary)
if hasattr(doc_processor_configer, "summary_metadata_key"):
self.summary_metadata_key = str(doc_processor_configer.summary_metadata_key)
return self

View File

@@ -1,7 +0,0 @@
name: 'summarize_docs'
description: 'Summarize or combine retrieved documents'
metadata:
type: 'DOC_PROCESSOR'
module: 'agentuniverse.agent.action.knowledge.doc_processor.summarize_processor'
class: 'SummarizeDocs'