mirror of
https://github.com/agentuniverse-ai/agentUniverse.git
synced 2026-02-09 01:59:19 +08:00
594 lines
25 KiB
Python
594 lines
25 KiB
Python
# !/usr/bin/env python3
|
|
# -*- coding:utf-8 -*-
|
|
|
|
# @Time : 2024/3/12 19:36
|
|
# @Author : heji
|
|
# @Email : lc299034@antgroup.com
|
|
# @FileName: agent.py
|
|
import json
|
|
import uuid
|
|
from abc import abstractmethod, ABC
|
|
from datetime import datetime
|
|
from threading import Thread
|
|
from typing import Optional, Any, List
|
|
|
|
from langchain_core.runnables import RunnableSerializable, RunnableConfig
|
|
from langchain_core.utils.json import parse_json_markdown
|
|
|
|
from agentuniverse.agent.action.knowledge.knowledge import Knowledge
|
|
from agentuniverse.agent.action.knowledge.knowledge_manager import \
|
|
KnowledgeManager
|
|
from agentuniverse.agent.action.knowledge.store.document import Document
|
|
from agentuniverse.agent.action.tool.tool import Tool
|
|
from agentuniverse.agent.action.tool.tool_manager import ToolManager
|
|
from agentuniverse.agent.action.toolkit.toolkit_manager import ToolkitManager
|
|
from agentuniverse.agent.agent_model import AgentModel
|
|
from agentuniverse.agent.input_object import InputObject
|
|
from agentuniverse.agent.memory.memory import Memory
|
|
from agentuniverse.agent.memory.memory_manager import MemoryManager
|
|
from agentuniverse.agent.memory.message import Message
|
|
from agentuniverse.agent.output_object import OutputObject
|
|
from agentuniverse.agent.plan.planner.planner import Planner
|
|
from agentuniverse.agent.plan.planner.planner_manager import PlannerManager
|
|
from agentuniverse.agent.plan.planner.react_planner.stream_callback import \
|
|
InvokeCallbackHandler
|
|
from agentuniverse.base.annotation.trace import trace_agent
|
|
from agentuniverse.base.component.component_base import ComponentBase
|
|
from agentuniverse.base.component.component_enum import ComponentEnum
|
|
from agentuniverse.base.config.application_configer.application_config_manager \
|
|
import ApplicationConfigManager
|
|
from agentuniverse.base.config.component_configer.configers.agent_configer \
|
|
import AgentConfiger
|
|
from agentuniverse.base.context.framework_context_manager import \
|
|
FrameworkContextManager
|
|
from agentuniverse.base.util.agent_util import process_agent_llm_config
|
|
from agentuniverse.base.util.common_util import stream_output
|
|
from agentuniverse.base.util.logging.logging_util import LOGGER
|
|
from agentuniverse.base.util.memory_util import generate_messages, get_memory_string
|
|
from agentuniverse.base.util.system_util import process_dict_with_funcs, is_system_builtin
|
|
from agentuniverse.base.tracing.au_trace_manager import AuTraceManager
|
|
from agentuniverse.llm.llm import LLM
|
|
from agentuniverse.llm.llm_manager import LLMManager
|
|
from agentuniverse.prompt.chat_prompt import ChatPrompt
|
|
from agentuniverse.prompt.prompt import Prompt
|
|
from agentuniverse.prompt.prompt_manager import PromptManager
|
|
from agentuniverse.prompt.prompt_model import AgentPromptModel
|
|
|
|
|
|
class Agent(ComponentBase, ABC):
|
|
"""The parent class of all agent models, containing only attributes."""
|
|
|
|
agent_model: Optional[AgentModel] = None
|
|
|
|
def __init__(self):
|
|
"""Initialize the AgentModel with the given keyword arguments."""
|
|
super().__init__(component_type=ComponentEnum.AGENT)
|
|
|
|
@abstractmethod
|
|
def input_keys(self) -> list[str]:
|
|
"""Return the input keys of the Agent."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def output_keys(self) -> list[str]:
|
|
"""Return the output keys of the Agent."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def parse_input(self, input_object: InputObject, agent_input: dict) -> dict:
|
|
"""Agent parameter parsing.
|
|
|
|
Args:
|
|
input_object (InputObject): input parameters passed by the user.
|
|
agent_input (dict): agent input preparsed by the agent.
|
|
Returns:
|
|
dict: agent input parsed from `input_object` by the user.
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def parse_result(self, agent_result: dict) -> dict:
|
|
"""Agent result parser.
|
|
|
|
Args:
|
|
agent_result(dict): The raw result of the agent.
|
|
Returns:
|
|
dict: The parsed result of the agent
|
|
"""
|
|
pass
|
|
|
|
def update_trace_context(self, input_object: InputObject):
|
|
session_id = input_object.get_data("session_id")
|
|
if session_id:
|
|
AuTraceManager().trace_context.set_session_id(session_id)
|
|
|
|
trace_id = input_object.get_data("trace_id")
|
|
if trace_id:
|
|
AuTraceManager().trace_context.set_trace_id(trace_id)
|
|
|
|
@trace_agent
|
|
def run(self, **kwargs) -> OutputObject:
|
|
"""Agent instance running entry.
|
|
|
|
Returns:
|
|
OutputObject: Agent execution result
|
|
"""
|
|
self.input_check(kwargs)
|
|
input_object = InputObject(kwargs)
|
|
|
|
self.update_trace_context(input_object)
|
|
|
|
agent_input = self.pre_parse_input(input_object)
|
|
|
|
planner_result = self.execute(input_object, agent_input)
|
|
|
|
agent_result = self.parse_result(planner_result)
|
|
|
|
self.output_check(agent_result)
|
|
output_object = OutputObject(agent_result)
|
|
return output_object
|
|
|
|
@trace_agent
|
|
async def async_run(self, **kwargs) -> OutputObject:
|
|
self.input_check(kwargs)
|
|
input_object = InputObject(kwargs)
|
|
self.update_trace_context(input_object)
|
|
|
|
agent_input = self.pre_parse_input(input_object)
|
|
|
|
agent_result = await self.async_execute(input_object, agent_input)
|
|
|
|
agent_result = self.parse_result(agent_result)
|
|
|
|
self.output_check(agent_result)
|
|
output_object = OutputObject(agent_result)
|
|
return output_object
|
|
|
|
def execute(self, input_object: InputObject, agent_input: dict) -> dict:
|
|
"""Execute agent instance.
|
|
|
|
Args:
|
|
input_object (InputObject): input parameters passed by the user.
|
|
agent_input (dict): agent input parsed from `input_object` by the user.
|
|
|
|
Returns:
|
|
dict: planner result generated by the planner execution.
|
|
"""
|
|
planner_base: Planner = PlannerManager().get_instance_obj(self.agent_model.plan.get('planner').get('name'))
|
|
planner_result = planner_base.invoke(self.agent_model, agent_input, input_object)
|
|
return planner_result
|
|
|
|
async def async_execute(self, input_object: InputObject, agent_input: dict) -> dict:
|
|
pass
|
|
|
|
def pre_parse_input(self, input_object) -> dict:
|
|
"""Agent execution parameter pre-parsing.
|
|
|
|
Args:
|
|
input_object (InputObject): input parameters passed by the user.
|
|
Returns:
|
|
dict: agent input preparsed by the agent.
|
|
"""
|
|
agent_input = dict()
|
|
agent_input['chat_history'] = input_object.get_data('chat_history') or ''
|
|
agent_input['background'] = input_object.get_data('background') or ''
|
|
agent_input['image_urls'] = input_object.get_data('image_urls') or []
|
|
agent_input['audio_url'] = input_object.get_data('audio_url') or ''
|
|
agent_input['date'] = datetime.now().strftime('%Y-%m-%d')
|
|
agent_input['session_id'] = input_object.get_data('session_id') or ''
|
|
agent_input['agent_id'] = self.agent_model.info.get('name', '')
|
|
self.parse_input(input_object, agent_input)
|
|
return agent_input
|
|
|
|
def get_instance_code(self) -> str:
|
|
"""Return the full name of the agent."""
|
|
appname = ApplicationConfigManager().app_configer.base_info_appname
|
|
name = self.agent_model.info.get('name')
|
|
return (f'{appname}.'
|
|
f'{self.component_type.value.lower()}.{name}')
|
|
|
|
def input_check(self, kwargs: dict):
|
|
"""Agent parameter check."""
|
|
for key in self.input_keys():
|
|
if key not in kwargs.keys():
|
|
raise Exception(f'Input must have key: {key}.')
|
|
|
|
def output_check(self, kwargs: dict):
|
|
"""Agent result check."""
|
|
if not isinstance(kwargs, dict):
|
|
raise Exception('Output type must be dict.')
|
|
for key in self.output_keys():
|
|
if key not in kwargs.keys():
|
|
raise Exception(f'Output must have key: {key}.')
|
|
|
|
def initialize_by_component_configer(self, component_configer: AgentConfiger) -> 'Agent':
|
|
"""Initialize the Agent by the AgentConfiger object.
|
|
|
|
Args:
|
|
component_configer(AgentConfiger): the ComponentConfiger object
|
|
Returns:
|
|
Agent: the Agent object
|
|
"""
|
|
agent_config: Optional[AgentConfiger] = component_configer.load()
|
|
info: Optional[dict] = agent_config.info
|
|
profile: Optional[dict] = agent_config.profile
|
|
profile = process_dict_with_funcs(profile, component_configer.yaml_func_instance)
|
|
profile = process_agent_llm_config(info.get('name'), profile, component_configer.default_llm_configer)
|
|
plan: Optional[dict] = agent_config.plan
|
|
memory: Optional[dict] = agent_config.memory
|
|
memory = process_dict_with_funcs(memory, component_configer.yaml_func_instance)
|
|
action: Optional[dict] = agent_config.action
|
|
action = process_dict_with_funcs(action, component_configer.yaml_func_instance)
|
|
agent_model: Optional[AgentModel] = AgentModel(info=info, profile=profile,
|
|
plan=plan, memory=memory, action=action)
|
|
self.agent_model = agent_model
|
|
return self
|
|
|
|
def langchain_run(self, input: str, callbacks=None, **kwargs):
|
|
"""Run the agent model using LangChain."""
|
|
try:
|
|
parse_result = parse_json_markdown(input)
|
|
except Exception as e:
|
|
LOGGER.error(f"langchain run parse_json_markdown error,input(parse_result) error({str(e)})")
|
|
return "Error , Your Action Input is not a valid JSON string"
|
|
output_object = self.run(**parse_result, callbacks=callbacks, **kwargs)
|
|
result_dict = {}
|
|
for key in self.output_keys():
|
|
result_dict[key] = output_object.get_data(key)
|
|
return result_dict
|
|
|
|
async def async_langchain_run(self, input: str, callbacks=None, **kwargs):
|
|
"""Run the agent model using LangChain."""
|
|
try:
|
|
parse_result = parse_json_markdown(input)
|
|
except Exception as e:
|
|
LOGGER.error(f"langchain run parse_json_markdown error,input(parse_result) error({str(e)})")
|
|
return "Error , Your Action Input is not a valid JSON string"
|
|
output_object = await self.async_run(**parse_result, callbacks=callbacks, **kwargs)
|
|
result_dict = {}
|
|
for key in self.output_keys():
|
|
result_dict[key] = output_object.get_data(key)
|
|
return result_dict
|
|
|
|
def as_langchain_tool(self):
|
|
"""Convert to LangChain tool."""
|
|
from langchain.agents.tools import Tool
|
|
format_dict = {}
|
|
for key in self.input_keys():
|
|
format_dict.setdefault(key, "input val")
|
|
format_str = json.dumps(format_dict)
|
|
|
|
args_description = f"""
|
|
to use this tool,your input must be a json string,must contain all keys of {self.input_keys()},
|
|
and the value of the key must be a json string,the format of the json string is as follows:
|
|
```{format_str}```
|
|
"""
|
|
return Tool(
|
|
name=self.agent_model.info.get("name"),
|
|
func=self.langchain_run,
|
|
description=self.agent_model.info.get("description") + args_description
|
|
)
|
|
|
|
async def async_as_langchain_tool(self):
|
|
"""Convert to LangChain tool."""
|
|
from langchain.agents.tools import Tool
|
|
format_dict = {}
|
|
for key in self.input_keys():
|
|
format_dict.setdefault(key, "input val")
|
|
format_str = json.dumps(format_dict)
|
|
|
|
args_description = f"""
|
|
to use this tool,your input must be a json string,must contain all keys of {self.input_keys()},
|
|
and the value of the key must be a json string,the format of the json string is as follows:
|
|
```{format_str}```
|
|
"""
|
|
return Tool(
|
|
name=self.agent_model.info.get("name"),
|
|
func=self.async_langchain_run,
|
|
description=self.agent_model.info.get("description") + args_description
|
|
)
|
|
|
|
def process_llm(self, **kwargs) -> LLM:
|
|
llm_name = kwargs.get('llm_name') or self.agent_model.profile.get('llm_model', {}).get('name')
|
|
llm: LLM = LLMManager().get_instance_obj(llm_name)
|
|
if is_system_builtin(llm):
|
|
LOGGER.warn("The system built-in LLM configuration YAML will be removed in the next version. "
|
|
"Please configure your own LLM model in the application's YAML file.")
|
|
return llm
|
|
|
|
def process_memory(self, agent_input: dict, **kwargs) -> Memory | None:
|
|
memory_name = kwargs.get('memory_name') or self.agent_model.memory.get('name')
|
|
memory: Memory = MemoryManager().get_instance_obj(memory_name)
|
|
conversation_memory_name = kwargs.get('conversation_memory') or self.agent_model.memory.get(
|
|
'conversation_memory')
|
|
conversation_memory: Memory = MemoryManager().get_instance_obj(
|
|
component_instance_name=conversation_memory_name)
|
|
if memory is None and conversation_memory is None:
|
|
return None
|
|
if memory is None:
|
|
memory = conversation_memory
|
|
|
|
chat_history: list = agent_input.get('chat_history')
|
|
# generate a list of temporary messages from the given chat history and add them to the memory instance.
|
|
temporary_messages: list[Message] = generate_messages(chat_history)
|
|
if temporary_messages:
|
|
memory.add(temporary_messages, **agent_input)
|
|
|
|
params: dict = dict()
|
|
params['agent_llm_name'] = kwargs.get('llm_name') or self.agent_model.profile.get('llm_model', {}).get('name')
|
|
return memory.set_by_agent_model(**params)
|
|
|
|
def invoke_chain(self, chain: RunnableSerializable[Any, str], agent_input: dict, input_object: InputObject,
|
|
**kwargs):
|
|
if not self.judge_chain_stream(chain):
|
|
res = chain.invoke(input=agent_input, config=self.get_run_config())
|
|
return res
|
|
result = []
|
|
for token in chain.stream(input=agent_input, config=self.get_run_config()):
|
|
stream_output(input_object.get_data('output_stream', None), {
|
|
'type': 'token',
|
|
'data': {
|
|
'chunk': token,
|
|
'agent_info': self.agent_model.info
|
|
}
|
|
})
|
|
result.append(token)
|
|
return self.generate_result(result)
|
|
|
|
async def async_invoke_chain(self, chain: RunnableSerializable[Any, str], agent_input: dict,
|
|
input_object: InputObject, **kwargs):
|
|
if not self.judge_chain_stream(chain):
|
|
res = await chain.ainvoke(input=agent_input, config=self.get_run_config())
|
|
return res
|
|
result = []
|
|
async for token in chain.astream(input=agent_input, config=self.get_run_config()):
|
|
stream_output(input_object.get_data('output_stream', None), {
|
|
'type': 'token',
|
|
'data': {
|
|
'chunk': token,
|
|
'agent_info': self.agent_model.info
|
|
}
|
|
})
|
|
result.append(token)
|
|
return self.generate_result(result)
|
|
|
|
def judge_chain_stream(self,
|
|
chain: RunnableSerializable[Any, str]) -> bool:
|
|
streaming = False
|
|
for _step in chain.steps:
|
|
if hasattr(_step, "llm"):
|
|
return _step.kwargs.get('streaming', _step.llm.streaming)
|
|
if hasattr(_step, "llm_channel"):
|
|
return _step.kwargs.get('streaming', _step.llm_channel.streaming)
|
|
return streaming
|
|
|
|
def generate_result(self, data: list[dict | str]):
|
|
if isinstance(data[0], str):
|
|
return "".join(data)
|
|
text = [val.get('text') for val in data]
|
|
reasoning_content = [val.get('reasoning_content', "") for val in data]
|
|
return {
|
|
'text': "".join(text),
|
|
'reasoning_content': "".join(reasoning_content)
|
|
}
|
|
|
|
@property
|
|
def tool_names(self) -> list:
|
|
return self._get_tool_names()
|
|
|
|
def _get_tool_names(self) -> list:
|
|
tool_name_list = self.agent_model.action.get('tool', [])
|
|
if tool_name_list is None:
|
|
tool_name_list = []
|
|
for toolkit_name in self.agent_model.action.get('toolkit', []):
|
|
toolkit = ToolkitManager().get_instance_obj(toolkit_name)
|
|
tool_name_list.extend(toolkit.tool_names)
|
|
return tool_name_list
|
|
|
|
def invoke_tools(self, input_object: InputObject, **kwargs) -> str:
|
|
tool_names = kwargs.get('tool_names') or self.tool_names
|
|
if not tool_names:
|
|
return ''
|
|
|
|
tool_results: list = list()
|
|
|
|
for tool_name in tool_names:
|
|
tool: Tool = ToolManager().get_instance_obj(tool_name)
|
|
if tool is None:
|
|
continue
|
|
try:
|
|
tool_input = {key: input_object.get_data(key) for key in tool.input_keys}
|
|
tool_results.append(str(tool.run(**tool_input)))
|
|
except:
|
|
LOGGER.warn(f'Tool {tool_name} call failed, maybe invalid or lack arguments')
|
|
return "\n\n".join(tool_results)
|
|
|
|
async def async_invoke_tools(self, input_object: InputObject, **kwargs) -> str:
|
|
tool_names = kwargs.get('tool_names') or self.agent_model.action.get('tool', [])
|
|
if not tool_names:
|
|
return ''
|
|
tool_results: list = list()
|
|
for tool_name in tool_names:
|
|
tool: Tool = ToolManager().get_instance_obj(tool_name)
|
|
if tool is None:
|
|
continue
|
|
tool_input = {key: input_object.get_data(key) for key in tool.input_keys}
|
|
tool_results.append(await tool.async_run(**tool_input))
|
|
return "\n\n".join(tool_results)
|
|
|
|
def invoke_knowledge(self, query_str: str, input_object: InputObject, **kwargs) -> str:
|
|
knowledge_names = kwargs.get('knowledge_names') or self.agent_model.action.get('knowledge', [])
|
|
if not knowledge_names or not query_str:
|
|
return ''
|
|
|
|
knowledge_results: list = list()
|
|
|
|
for knowledge_name in knowledge_names:
|
|
knowledge: Knowledge = KnowledgeManager().get_instance_obj(knowledge_name)
|
|
if knowledge is None:
|
|
continue
|
|
knowledge_res: List[Document] = knowledge.query_knowledge(
|
|
query_str=query_str,
|
|
**input_object.to_dict()
|
|
)
|
|
knowledge_results.append(knowledge.to_llm(knowledge_res))
|
|
return "\n\n".join(knowledge_results)
|
|
|
|
def process_prompt(self, agent_input: dict, **kwargs) -> ChatPrompt:
|
|
expert_framework = agent_input.pop('expert_framework', '') or ''
|
|
|
|
profile: dict = self.agent_model.profile
|
|
|
|
profile_instruction = profile.get('instruction')
|
|
profile_instruction = expert_framework + profile_instruction if profile_instruction else profile_instruction
|
|
|
|
profile_prompt_model: AgentPromptModel = AgentPromptModel(introduction=profile.get('introduction'),
|
|
target=profile.get('target'),
|
|
instruction=profile_instruction)
|
|
|
|
# get the prompt by the prompt version
|
|
prompt_version = kwargs.get('prompt_version') or self.agent_model.profile.get('prompt_version')
|
|
version_prompt: Prompt = PromptManager().get_instance_obj(prompt_version)
|
|
|
|
if version_prompt is None and not profile_prompt_model:
|
|
raise Exception("Either the `prompt_version` or `introduction & target & instruction`"
|
|
" in agent profile configuration should be provided.")
|
|
if version_prompt:
|
|
version_prompt_model: AgentPromptModel = AgentPromptModel(
|
|
introduction=getattr(version_prompt, 'introduction', ''),
|
|
target=getattr(version_prompt, 'target', ''),
|
|
instruction=expert_framework + getattr(version_prompt, 'instruction', ''))
|
|
profile_prompt_model = profile_prompt_model + version_prompt_model
|
|
|
|
chat_prompt = ChatPrompt().build_prompt(profile_prompt_model, ['introduction', 'target', 'instruction'])
|
|
image_urls: list = agent_input.pop('image_urls', []) or []
|
|
if image_urls:
|
|
chat_prompt.generate_image_prompt(image_urls)
|
|
|
|
audio_url: str = agent_input.pop('audio_url') or ''
|
|
if audio_url:
|
|
chat_prompt.generate_audio_prompt(audio_url)
|
|
return chat_prompt
|
|
|
|
def get_memory_params(self, agent_input: dict) -> dict:
|
|
memory_info = self.agent_model.memory
|
|
memory_types = self.agent_model.memory.get('memory_types', None)
|
|
prune = self.agent_model.memory.get('prune', False)
|
|
top_k = self.agent_model.memory.get('top_k', 20)
|
|
session_id = agent_input.get('session_id')
|
|
agent_id = self.agent_model.info.get('name')
|
|
if not session_id:
|
|
session_id = FrameworkContextManager().get_context('session_id')
|
|
if "agent_id" in memory_info:
|
|
agent_id = memory_info.get('agent_id')
|
|
params = {
|
|
'session_id': session_id,
|
|
'agent_id': agent_id,
|
|
'prune': prune,
|
|
'top_k': top_k
|
|
}
|
|
if memory_types:
|
|
params['memory_types'] = memory_types
|
|
if agent_input.get('input'):
|
|
params['input'] = agent_input.get('input')
|
|
if not self.agent_model.memory.get('name') and self.agent_model.memory.get('conversation_memory'):
|
|
params["type"] = ['input', 'output']
|
|
return params
|
|
|
|
def get_run_config(self, **kwargs) -> dict:
|
|
llm_name = kwargs.get('llm_name') or self.agent_model.profile.get('llm_model', {}).get('name')
|
|
callbacks = [InvokeCallbackHandler(
|
|
source=self.agent_model.info.get('name'),
|
|
llm_name=llm_name
|
|
)]
|
|
return RunnableConfig(callbacks=callbacks)
|
|
|
|
def collect_current_memory(self, collect_type: str) -> bool:
|
|
collection_types = self.agent_model.memory.get('collection_types')
|
|
auto_trace = self.agent_model.memory.get('auto_trace', True)
|
|
if not auto_trace:
|
|
return False
|
|
if collection_types and collect_type not in collection_types:
|
|
return False
|
|
return True
|
|
|
|
def load_memory(self, memory, agent_input: dict):
|
|
if memory:
|
|
params = self.get_memory_params(agent_input)
|
|
LOGGER.info(f"Load memory with params: {params}")
|
|
memory_messages = memory.get(**params)
|
|
memory_str = get_memory_string(memory_messages, agent_input.get('agent_id'))
|
|
else:
|
|
return "Up to Now, No Chat History"
|
|
agent_input[memory.memory_key] = memory_str
|
|
return memory_str
|
|
|
|
def add_memory(self, memory: Memory, content: Any, type: str = 'Q&A', agent_input: dict[str, Any] = {}):
|
|
if not memory:
|
|
return
|
|
session_id = agent_input.get('session_id')
|
|
if not session_id:
|
|
session_id = FrameworkContextManager().get_context('session_id')
|
|
agent_id = self.agent_model.info.get('name')
|
|
message = Message(id=str(uuid.uuid4().hex),
|
|
source=agent_id,
|
|
content=content if isinstance(content, str) else json.dumps(content, ensure_ascii=False),
|
|
type=type,
|
|
metadata={
|
|
'agent_id': agent_id,
|
|
'session_id': session_id,
|
|
'type': type,
|
|
'timestamp': datetime.now(),
|
|
'gmt_created': datetime.now().isoformat()
|
|
})
|
|
memory.add([message], session_id=session_id, agent_id=agent_id)
|
|
|
|
def summarize_memory(self, agent_input: dict[str, Any] = {}, memory: Memory = None):
|
|
def do_summarize(params):
|
|
content = memory.summarize_memory(**params)
|
|
memory.add([
|
|
Message(
|
|
id=str(uuid.uuid4().hex),
|
|
source=self.agent_model.info.get('name'),
|
|
content=content,
|
|
type='summarize'
|
|
)
|
|
], session_id=params['session_id'], agent_id=params['agent_id'])
|
|
|
|
if memory:
|
|
params = self.get_memory_params(agent_input)
|
|
Thread(target=do_summarize, args=(params,)).start()
|
|
|
|
def load_summarize_memory(self, memory: Memory, agent_input: dict[str, Any] = {}) -> str:
|
|
if memory:
|
|
params = self.get_memory_params(agent_input)
|
|
params['type'] = 'summarize'
|
|
memory_messages = memory.get(**params)
|
|
if len(memory_messages) == 0:
|
|
return "Up to Now, No Summarize Memory"
|
|
else:
|
|
return memory_messages[-1].content
|
|
return "Up to Now, No Summarize Memory"
|
|
|
|
def get_tool_descriptions(self) -> List[str]:
|
|
description_list = []
|
|
if self.agent_model.action.get('tool', []):
|
|
for tool in self.agent_model.action.get('tool', []):
|
|
tool_ins = ToolManager().get_instance_obj(tool)
|
|
description_list.append(f'tool name:{tool_ins.name}\ntool description:{tool_ins.description}\n')
|
|
|
|
if self.agent_model.action.get('toolkit', []):
|
|
for toolkit in self.agent_model.action.get('toolkit', []):
|
|
description_list.extend(ToolkitManager().get_instance_obj(toolkit).tool_descriptions)
|
|
|
|
return description_list
|
|
|
|
def get_func_call_list(self) -> List[str]:
|
|
pass
|
|
|
|
def create_copy(self):
|
|
copied = self.model_copy()
|
|
if self.agent_model is not None:
|
|
copied.agent_model = self.agent_model.model_copy(deep=True)
|
|
return copied
|