feature: add productization capabilities for agentUniverse.

This commit is contained in:
wangchongshi
2024-08-02 10:52:49 +08:00
parent fa28937485
commit d7b2af75e4
12 changed files with 321 additions and 30 deletions

View File

@@ -13,6 +13,7 @@ from pydantic import BaseModel
from langchain.tools import Tool as LangchainTool
from agentuniverse.agent.action.tool.enum import ToolTypeEnum
from agentuniverse.base.annotation.trace import trace_tool
from agentuniverse.base.component.component_base import ComponentBase
from agentuniverse.base.component.component_enum import ComponentEnum
from agentuniverse.base.config.application_configer.application_config_manager import ApplicationConfigManager
@@ -61,6 +62,7 @@ class Tool(ComponentBase):
def __init__(self, **kwargs):
super().__init__(component_type=ComponentEnum.TOOL, **kwargs)
@trace_tool
def run(self, **kwargs):
"""The callable method that runs the tool."""
self.input_check(kwargs)
@@ -73,6 +75,7 @@ class Tool(ComponentBase):
if key not in kwargs.keys():
raise Exception(f'{self.get_instance_code()} - The input must include key: {key}.')
@trace_tool
def langchain_run(self, *args, callbacks=None, **kwargs):
"""The callable method that runs the tool."""
kwargs["callbacks"] = callbacks

View File

@@ -5,21 +5,23 @@
# @Email : lc299034@antgroup.com
# @FileName: executing_agent.py
"""Executing Agent module."""
import copy
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
from typing import Optional, Any
from agentuniverse.agent.action.tool.tool_manager import ToolManager
from agentuniverse.agent.agent import Agent
from agentuniverse.agent.agent_model import AgentModel
from agentuniverse.agent.input_object import InputObject
from agentuniverse.agent.plan.planner.planner import Planner
from agentuniverse.agent.plan.planner.planner_manager import PlannerManager
from agentuniverse.base.context.framework_context_manager import FrameworkContextManager
class ExecutingAgent(Agent):
"""Executing Agent class."""
executor: Optional[Any] = ThreadPoolExecutor(max_workers=10, thread_name_prefix="executing_agent")
_context_values: Optional[dict] = {}
def input_keys(self) -> list[str]:
"""Return the input keys of the Agent."""
@@ -73,6 +75,7 @@ class ExecutingAgent(Agent):
Returns:
dict: Agent result object.
"""
self._context_values: dict = FrameworkContextManager().get_all_contexts()
framework = agent_input.get('framework', [])
futures = []
for task in framework:
@@ -81,11 +84,24 @@ class ExecutingAgent(Agent):
agent_input_copy['input'] = task
planner: Planner = PlannerManager().get_instance_obj(self.agent_model.plan.get('planner').get('name'))
futures.append(
self.executor.submit(planner.invoke, self.agent_model, agent_input_copy,
self.executor.submit(self.run_in_executor, planner, self.agent_model, agent_input_copy,
self.process_intput_object(input_object, task, planner.input_key)))
wait(futures, return_when=ALL_COMPLETED)
return {'futures': futures}
def run_in_executor(self, planner: Planner, agent_model: AgentModel, planner_input: dict,
input_object: InputObject) -> dict:
context_tokens = {}
try:
for var_name, var_value in self._context_values.items():
token = FrameworkContextManager().set_context(var_name, var_value)
context_tokens[var_name] = token
res = planner.invoke(agent_model, planner_input, input_object)
return res
finally:
for var_name, token in context_tokens.items():
FrameworkContextManager().reset_context(var_name, token)
def process_intput_object(self, input_object: InputObject, subtask: str, planner_input_key: str) -> InputObject:
"""Process input object for the executing agent.

View File

@@ -5,7 +5,6 @@
# @Author : fanen.lhy
# @Email : fanen.lhy@antgroup.com
# @FileName: request_task.py
import enum
from enum import Enum
import json
@@ -125,6 +124,7 @@ class RequestTask:
def stream_run(self):
"""Run the service in a separate thread and yield result stream."""
self.kwargs['output_stream'] = self.queue
self.thread = ThreadWithReturnValue(target=self.func,
kwargs=self.kwargs)
self.thread.start()

View File

@@ -8,9 +8,12 @@
from threading import Thread
from agentuniverse.base.context.framework_context_manager import FrameworkContextManager
class ThreadWithReturnValue(Thread):
"""A thread can save the target func exec result."""
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None):
super().__init__(group, target, name, args, kwargs)
@@ -22,10 +25,16 @@ class ThreadWithReturnValue(Thread):
self.target = target
self._return = None
self.error = None
self._context_values: dict = FrameworkContextManager().get_all_contexts()
self._context_tokens = {}
def run(self):
"""Run the target func and save result in _return."""
if self.target is not None:
for var_name, var_value in self._context_values.items():
token = FrameworkContextManager().set_context(var_name, var_value)
self._context_tokens[var_name] = token
try:
self._return = self.target(*self.args, **self.kwargs)
except Exception as e:
@@ -33,6 +42,8 @@ class ThreadWithReturnValue(Thread):
finally:
if 'output_stream' in self.kwargs:
self.kwargs['output_stream'].put('{"type": "EOF"}')
for var_name, token in self._context_tokens.items():
FrameworkContextManager().reset_context(var_name, token)
def result(self):
"""Wait for target func finished, then return the result or raise an

View File

@@ -8,12 +8,10 @@
import asyncio
import functools
import inspect
import json
from functools import wraps
from agentuniverse.agent.input_object import InputObject
from agentuniverse.agent.output_object import OutputObject
from agentuniverse.base.context.framework_context_manager import FrameworkContextManager
from agentuniverse.base.util.monitor.monitor import Monitor
from agentuniverse.llm.llm_output import LLMOutput
@@ -28,11 +26,24 @@ def trace_llm(func):
async def wrapper_async(*args, **kwargs):
# get llm input from arguments
llm_input = _get_llm_input(func, *args, **kwargs)
source = func.__qualname__
# check whether the tracing switch is enabled
self = llm_input.pop('self', None)
if self and hasattr(self, 'name'):
name = self.name
if name is not None:
source = name
# add invocation chain to the monitor module.
Monitor.add_invocation_chain(source=source)
if self and hasattr(self, 'tracing'):
if self.tracing is False:
return await func(*args, **kwargs)
# invoke function
result = await func(*args, **kwargs)
# not streaming
@@ -55,19 +66,36 @@ def trace_llm(func):
@functools.wraps(func)
def wrapper_sync(*args, **kwargs):
# get llm input from arguments
llm_input = _get_llm_input(func, *args, **kwargs)
source = func.__qualname__
# check whether the tracing switch is enabled
self = llm_input.pop('self', None)
if self and hasattr(self, 'name'):
name = self.name
if name is not None:
source = name
# add invocation chain to the monitor module.
Monitor.add_invocation_chain(source=source)
if self and hasattr(self, 'tracing'):
if self.tracing is False:
return func(*args, **kwargs)
# invoke function
result = func(*args, **kwargs)
# not streaming
if isinstance(result, LLMOutput):
# add llm invocation info to monitor
Monitor().trace_llm_invocation(source=func.__qualname__, llm_input=llm_input, llm_output=result.text)
# add llm token usage to monitor
trace_llm_token_usage(self, llm_input, result.text)
return result
else:
# streaming
@@ -76,9 +104,15 @@ def trace_llm(func):
for chunk in result:
llm_output.append(chunk.text)
yield chunk
output_str = "".join(llm_output)
# add llm invocation info to monitor
Monitor().trace_llm_invocation(source=func.__qualname__, llm_input=llm_input,
llm_output="".join(llm_output))
llm_output=output_str)
# add llm token usage to monitor
trace_llm_token_usage(self, llm_input, output_str)
return gen_iterator()
@@ -123,6 +157,9 @@ def trace_agent(func):
if isinstance(profile, dict):
tracing = profile.get('tracing', None)
# add invocation chain to the monitor module.
Monitor.add_invocation_chain(source=source)
if tracing is False:
return func(*args, **kwargs)
@@ -136,9 +173,83 @@ def trace_agent(func):
return wrapper_sync
def trace_tool(func):
"""Annotation: @trace_tool
Decorator to trace the tool invocation.
"""
@functools.wraps(func)
def wrapper_sync(*args, **kwargs):
# get tool input from arguments
tool_input = _get_agent_input(func, *args, **kwargs)
source = func.__qualname__
self = tool_input.pop('self', None)
if isinstance(self, object):
name = getattr(self, 'name', None)
if name is not None:
source = name
# add invocation chain to the monitor module.
Monitor.add_invocation_chain(source=source)
# invoke function
return func(*args, **kwargs)
# sync function
return wrapper_sync
def _get_agent_input(func, *args, **kwargs) -> dict:
"""Get the agent input from arguments."""
sig = inspect.signature(func)
bound_args = sig.bind(*args, **kwargs)
bound_args.apply_defaults()
return {k: v for k, v in bound_args.arguments.items()}
def _get_llm_token_usage(llm_obj: object, llm_input: dict, output_str: str) -> dict:
if llm_obj is None or llm_input is None:
return {}
messages = llm_input.pop('messages', None)
input_str = ''
if messages is not None and isinstance(messages, list):
for m in messages:
if isinstance(m, dict):
input_str += str(m.get('role', '')) + '\n'
input_str += str(m.get('content', '')) + '\n'
elif isinstance(m, object):
if hasattr(m, 'role'):
role = m.role
if role is not None:
input_str += str(m.role) + '\n'
if hasattr(m, 'content'):
content = m.content
if content is not None:
input_str += str(m.content) + '\n'
if input_str == '' or output_str == '':
return {}
usage = {}
if hasattr(llm_obj, 'get_num_tokens'):
completion_tokens = llm_obj.get_num_tokens(output_str)
prompt_tokens = llm_obj.get_num_tokens(input_str)
total_tokens = completion_tokens + prompt_tokens
usage = {'completion_tokens': completion_tokens, 'prompt_tokens': prompt_tokens,
'total_tokens': total_tokens}
return usage
def trace_llm_token_usage(llm_obj: object, llm_input: dict, output_str: str):
trace_id = FrameworkContextManager().get_context('trace_id')
# trace token usage for a complete request chain based on trace id
if trace_id:
token_usage: dict = _get_llm_token_usage(llm_obj, llm_input, output_str)
if token_usage:
Monitor.add_token_usage(token_usage)

View File

@@ -5,7 +5,6 @@
# @Author : fanen.lhy
# @Email : fanen.lhy@antgroup.com
# @FileName: framework_context_manager.py
from contextvars import ContextVar, Token
import threading
from typing import Dict, Any
@@ -84,3 +83,15 @@ class FrameworkContextManager:
Token used to reset a context variable.
"""
self.__context_dict[var_name].reset(token)
def get_all_contexts(self) -> Dict[str, Any]:
"""Get all context variables and their values."""
context_values = {}
for var_name in self.__context_dict.keys():
context_values[var_name] = self.get_context(var_name)
return context_values
def set_all_contexts(self, context_values: Dict[str, Any]):
"""Set all context variables using the provided dictionary."""
for var_name, var_value in context_values.items():
self.set_context(var_name, var_value)

View File

@@ -8,6 +8,7 @@
import datetime
import json
import os
import uuid
from typing import Union, Optional
from pydantic import BaseModel
@@ -16,6 +17,7 @@ from agentuniverse.agent.input_object import InputObject
from agentuniverse.agent.output_object import OutputObject
from agentuniverse.base.annotation.singleton import singleton
from agentuniverse.base.config.configer import Configer
from agentuniverse.base.context.framework_context_manager import FrameworkContextManager
LLM_INVOCATION_SUBDIR = "llm_invocation"
AGENT_INVOCATION_SUBDIR = "agent_invocation"
@@ -86,6 +88,71 @@ class Monitor(BaseModel):
with jsonlines.open(path_save, 'a') as writer:
writer.write(agent_invocation)
@staticmethod
def init_trace_id():
if FrameworkContextManager().get_context('trace_id') is None:
FrameworkContextManager().set_context('trace_id', str(uuid.uuid4()))
@staticmethod
def init_invocation_chain():
Monitor.init_trace_id()
trace_id = FrameworkContextManager().get_context('trace_id')
if FrameworkContextManager().get_context(trace_id) is None:
FrameworkContextManager().set_context(trace_id + '_invocation_chain', [])
@staticmethod
def clear_invocation_chain():
trace_id = FrameworkContextManager().get_context('trace_id')
if trace_id is not None:
FrameworkContextManager().del_context(trace_id + '_invocation_chain')
FrameworkContextManager().del_context('trace_id')
@staticmethod
def add_invocation_chain(source: str):
trace_id = FrameworkContextManager().get_context('trace_id')
if trace_id is not None:
invocation_chain = FrameworkContextManager().get_context(trace_id + '_invocation_chain')
if invocation_chain is not None:
invocation_chain.append(source)
FrameworkContextManager().set_context(trace_id + '_invocation_chain', invocation_chain)
@staticmethod
def get_trace_id():
return FrameworkContextManager().get_context('trace_id')
@staticmethod
def get_invocation_chain():
trace_id = FrameworkContextManager().get_context('trace_id')
return FrameworkContextManager().get_context(trace_id + '_invocation_chain', []) if trace_id is not None else []
@staticmethod
def init_token_usage():
Monitor.init_trace_id()
trace_id = FrameworkContextManager().get_context('trace_id')
if FrameworkContextManager().get_context(trace_id) is None:
FrameworkContextManager().set_context(trace_id + '_token_usage', {})
@staticmethod
def add_token_usage(cur_token_usage: dict):
trace_id = FrameworkContextManager().get_context('trace_id')
if trace_id is not None:
old_token_usage: dict = FrameworkContextManager().get_context(trace_id + '_token_usage')
for key, value in cur_token_usage.items():
old_token_usage[key] = old_token_usage[key] + value if key in old_token_usage else value
FrameworkContextManager().set_context(trace_id + '_token_usage', old_token_usage)
@staticmethod
def clear_token_usage():
trace_id = FrameworkContextManager().get_context('trace_id')
if trace_id is not None:
FrameworkContextManager().del_context(trace_id + '_token_usage')
FrameworkContextManager().del_context('trace_id')
@staticmethod
def get_token_usage():
trace_id = FrameworkContextManager().get_context('trace_id')
return FrameworkContextManager().get_context(trace_id + '_token_usage', {}) if trace_id is not None else {}
def _get_or_create_subdir(self, subdir: str) -> str:
"""Get or create a subdirectory if it doesn't exist in the monitor directory."""
path = os.path.join(self.dir, subdir)

View File

@@ -17,6 +17,7 @@ from agentuniverse.agent.output_object import OutputObject
from agentuniverse.agent_serve.web.request_task import RequestTask
from agentuniverse.agent_serve.web.web_util import agent_run_queue
from agentuniverse.base.component.component_enum import ComponentEnum
from agentuniverse.base.util.monitor.monitor import Monitor
from agentuniverse.llm.llm import LLM
from agentuniverse.llm.llm_manager import LLMManager
from agentuniverse.prompt.prompt import Prompt
@@ -95,6 +96,11 @@ class AgentService:
agent: Agent = AgentManager().get_instance_obj(agent_id)
if agent is None:
raise ValueError("The agent instance corresponding to the agent id cannot be found.")
# init the invocation chain and token usage of the monitor module
Monitor.init_invocation_chain()
Monitor.init_token_usage()
# invoke agent
start_time = time.time()
output_object: OutputObject = agent.run(input=input,
@@ -104,12 +110,19 @@ class AgentService:
response_time = round((end_time - start_time) * 1000, 2)
output = output_object.get_data('output')
# get and clear invocation chain and token usage.
invocation_chain = Monitor.get_invocation_chain()
token_usage = Monitor.get_token_usage()
Monitor.clear_invocation_chain()
Monitor.clear_token_usage()
# add agent chat history
session_id, message_id = AgentService().add_agent_chat_history(agent_id, session_id, input, output)
return {'response_time': response_time, 'message_id': message_id, 'session_id': session_id, 'output': output,
'start_time': datetime.fromtimestamp(start_time).strftime("%Y-%m-%d %H:%M:%S"),
'end_time': datetime.fromtimestamp(end_time).strftime("%Y-%m-%d %H:%M:%S")}
'end_time': datetime.fromtimestamp(end_time).strftime("%Y-%m-%d %H:%M:%S"),
'invocation_chain': invocation_chain, 'token_usage': token_usage}
@staticmethod
def stream_chat(agent_id: str, session_id: str, input: str) -> Iterator:
@@ -119,6 +132,10 @@ class AgentService:
if agent is None:
raise ValueError("The agent instance corresponding to the agent id cannot be found.")
# init the invocation chain and token usage of the monitor module
Monitor.init_invocation_chain()
Monitor.init_token_usage()
# invoke agent
start_time = time.time()
task = RequestTask(agent_run_queue, False,
@@ -128,6 +145,7 @@ class AgentService:
output_iterator = task.stream_run()
final_result: dict = dict()
error_result: dict = dict()
# generate iterator
for chunk in output_iterator:
chunk = chunk.replace("data:", "", 1)
@@ -140,20 +158,32 @@ class AgentService:
yield {'output': data['output'], 'type': 'intermediate_steps'}
elif "result" in chunk_dict:
final_result = chunk_dict['result']
elif "error" in chunk_dict:
error_result = chunk_dict['error']
end_time = time.time()
# calculate response time
response_time = round((end_time - start_time) * 1000, 2)
output = final_result.get('output')
if len(final_result) > 0:
output = final_result.get('output')
# add agent chat history
session_id, message_id = AgentService().add_agent_chat_history(agent_id, session_id, input, output)
# add agent chat history
session_id, message_id = AgentService().add_agent_chat_history(agent_id, session_id, input, output)
# return final yield
yield {'response_time': response_time, 'message_id': message_id, 'session_id': session_id, 'output': output,
'start_time': datetime.fromtimestamp(start_time).strftime("%Y-%m-%d %H:%M:%S"),
'end_time': datetime.fromtimestamp(end_time).strftime("%Y-%m-%d %H:%M:%S"), 'type': 'final_result'}
# get and clear invocation chain and invocation chain.
invocation_chain = Monitor.get_invocation_chain()
token_usage = Monitor.get_token_usage()
Monitor.clear_invocation_chain()
Monitor.clear_token_usage()
# return final yield
yield {'response_time': response_time, 'message_id': message_id, 'session_id': session_id, 'output': output,
'start_time': datetime.fromtimestamp(start_time).strftime("%Y-%m-%d %H:%M:%S"),
'end_time': datetime.fromtimestamp(end_time).strftime("%Y-%m-%d %H:%M:%S"),
'invocation_chain': invocation_chain, 'token_usage': token_usage, 'type': 'final_result'}
else:
yield {'error': error_result, 'type': 'error'}
@staticmethod
def get_planner_dto(agent_model: AgentModel) -> PlannerDTO | None:
@@ -270,7 +300,7 @@ class AgentService:
@staticmethod
def get_agent_chat_history(session_id: str) -> list:
session_dto: SessionDTO = SessionService().get_session_detail(session_id, 10)
session_dto: SessionDTO = SessionService().get_session_detail(session_id, 5)
chat_history = []
if session_dto:
messages: List[MessageDTO] = session_dto.messages

View File

@@ -46,7 +46,7 @@ class DiscussionPlanner(Planner):
# generate participant agents
participant_agents = self.generate_participant_agents(planner_config)
# invoke agents
return self.agents_run(participant_agents, planner_config, agent_model, planner_input)
return self.agents_run(participant_agents, planner_config, agent_model, planner_input, input_object)
@staticmethod
def generate_participant_agents(planner_config: dict) -> dict:
@@ -61,7 +61,7 @@ class DiscussionPlanner(Planner):
return agents
def agents_run(self, participant_agents: dict, planner_config: dict, agent_model: AgentModel,
agent_input: dict) -> dict:
agent_input: dict, input_object: InputObject) -> dict:
""" Invoke the participant agents and host agent.
Args:
@@ -69,6 +69,7 @@ class DiscussionPlanner(Planner):
planner_config (dict): Planner config.
agent_model (AgentModel): Agent model object.
agent_input (dict): Agent input object.
input_object (InputObject): The input parameters passed by the user.
Returns:
dict: The planner result.
"""
@@ -76,8 +77,11 @@ class DiscussionPlanner(Planner):
chat_history = []
LOGGER.info(f"The topic of discussion is {agent_input.get(self.input_key)}")
LOGGER.info(f"The participant agents are {'|'.join(participant_agents.keys())}")
agent_input['total_round'] = total_round
agent_input['participants'] = ' and '.join(participant_agents.keys())
input_object.add_data('chat_history', chat_history)
input_object.add_data('total_round', total_round)
input_object.add_data('participants', ' and '.join(participant_agents.keys()))
for i in range(total_round):
LOGGER.info("------------------------------------------------------------------")
LOGGER.info(f"Start a discussion, round is {i + 1}.")
@@ -86,29 +90,39 @@ class DiscussionPlanner(Planner):
LOGGER.info(f"Start speaking: agent is {agent_name}.")
LOGGER.info("------------------------------------------------------------------")
# invoke participant agent
agent_input['agent_name'] = agent_name
agent_input['cur_round'] = i + 1
output_object: OutputObject = agent.run(**agent_input)
input_object.add_data('agent_name', agent_name)
input_object.add_data('cur_round', i + 1)
output_object: OutputObject = agent.run(**input_object.to_dict())
current_output = output_object.get_data('output', '')
# process chat history
chat_history.append({'content': agent_input.get('input'), 'type': 'human'})
chat_history.append(
{'content': f'the round {i + 1} agent {agent_name} thought: {current_output}', 'type': 'ai'})
agent_input['chat_history'] = chat_history
input_object.add_data('chat_history', chat_history)
# add to the stream queue.
self.stream_output(input_object, {"data": {
'output': current_output,
"agent_info": agent_model.info
}, "type": "participant_agent"})
LOGGER.info(f"the round {i + 1} agent {agent_name} thought: {output_object.get_data('output', '')}")
# concatenate the agent input parameters of the host agent.
agent_input['chat_history'] = chat_history
# finally invoke host agent
return self.invoke_host_agent(agent_model, agent_input)
agent_input['total_round'] = total_round
agent_input['participants'] = ' and '.join(participant_agents.keys())
def invoke_host_agent(self, agent_model: AgentModel, planner_input: dict) -> dict:
# finally invoke host agent
return self.invoke_host_agent(agent_model, agent_input, input_object)
def invoke_host_agent(self, agent_model: AgentModel, planner_input: dict, input_object: InputObject) -> dict:
""" Invoke the host agent.
Args:
agent_model (AgentModel): Agent model object.
planner_input (dict): Planner input object.
input_object (InputObject): The input parameters passed by the user.
Returns:
dict: The planner result.
"""
@@ -131,8 +145,7 @@ class DiscussionPlanner(Planner):
history_messages_key="chat_history",
input_messages_key=self.input_key,
) | StrOutputParser()
res = asyncio.run(
chain_with_history.ainvoke(input=planner_input, config={"configurable": {"session_id": "unused"}}))
res = self.invoke_chain(agent_model, chain_with_history, planner_input, chat_history, input_object)
LOGGER.info(f"Discussion summary is: {res}")
return {**planner_input, self.output_key: res, 'chat_history': generate_memories(chat_history)}

View File

@@ -0,0 +1,7 @@
id: host_agent
nickname: 简单的多智能体讨论组样例
type: AGENT
metadata:
class: AgentProduct
module: agentuniverse_product.base.agent_product
type: PRODUCT

View File

@@ -0,0 +1,7 @@
id: demo_react_agent
nickname: 简单的ReAct智能体样例
type: AGENT
metadata:
class: AgentProduct
module: agentuniverse_product.base.agent_product
type: PRODUCT

View File

@@ -27,6 +27,7 @@ if __name__ == '__main__':
for chunk in AgentService().stream_chat('demo_peer_agent', session_list[0].id,
'巴菲特为什么减持比亚迪'):
print(chunk)
print('----------------------------------')
agent_list: List[AgentDTO] = AgentService.get_agent_list()
@@ -42,3 +43,17 @@ if __name__ == '__main__':
tool_list: List[ToolDTO] = ToolService.get_tool_list()
llm_list: List[LlmDTO] = LLMService.get_llm_list()
print('----------------------------------')
SessionService().create_session('demo_rag_agent')
session_list: List[SessionDTO] = SessionService.get_session_list('demo_rag_agent')
for chunk in AgentService().stream_chat('demo_rag_agent', session_list[0].id,
'巴菲特为什么减持比亚迪'):
print(chunk)
print('----------------------------------')
for chunk in AgentService().stream_chat('demo_rag_agent', session_list[0].id,
'我刚才问了什么?'):
print(chunk)
print('----------------------------------')