feat: add trace_llm annotation to trace the llm invocation record.

This commit is contained in:
wangchongshi
2024-06-05 19:53:25 +08:00
parent 65fee7aea6
commit 1920e6a4c4
18 changed files with 297 additions and 43 deletions

View File

@@ -6,7 +6,7 @@
# @Email : wangchongshi.wcs@antgroup.com
# @FileName: default_memory.py
from agentuniverse.agent.memory.chat_memory import ChatMemory
from agentuniverse.llm.openai_llm import OpenAILLM
from agentuniverse.llm.default.default_openai_llm import DefaultOpenAILLM
class DefaultMemory(ChatMemory):
@@ -24,4 +24,4 @@ class DefaultMemory(ChatMemory):
default memory uses OpenAILLM(gpt-3.5-turbo) object as the memory llm.
"""
super().__init__(**kwargs)
self.llm = OpenAILLM(model_name="gpt-3.5-turbo")
self.llm = DefaultOpenAILLM(model_name="gpt-4o")

View File

@@ -84,12 +84,11 @@ class Planner(ComponentBase):
params['input_key'] = self.input_key
params['output_key'] = self.output_key
memory: ChatMemory = MemoryManager().get_instance_obj(memory_name)
memory: ChatMemory = MemoryManager().get_instance_obj(component_instance_name=memory_name, new_instance=True)
if memory is None:
return None
copied_memory = copy.deepcopy(memory)
copied_memory.set_by_agent_model(**params)
return copied_memory
memory.set_by_agent_model(**params)
return memory
def handle_all_actions(self, agent_model: AgentModel, planner_input: dict, input_object: InputObject):
"""Tool and knowledge processing.
@@ -143,10 +142,9 @@ class Planner(ComponentBase):
LLM: The language model.
"""
llm_name = agent_model.profile.get('llm_model').get('name')
llm: LLM = LLMManager().get_instance_obj(llm_name)
copied_llm = copy.deepcopy(llm)
copied_llm.set_by_agent_model(**agent_model.profile.get('llm_model'))
return copied_llm
llm: LLM = LLMManager().get_instance_obj(component_instance_name=llm_name, new_instance=True)
llm.set_by_agent_model(**agent_model.profile.get('llm_model'))
return llm
def initialize_by_component_configer(self, component_configer: PlannerConfiger) -> 'Planner':
"""Initialize the planner by the PlannerConfiger object.

View File

@@ -21,6 +21,7 @@ from agentuniverse.base.config.config_type_enum import ConfigTypeEnum
from agentuniverse.base.config.configer import Configer
from agentuniverse.base.config.custom_configer.custom_key_configer import CustomKeyConfiger
from agentuniverse.base.component.component_enum import ComponentEnum
from agentuniverse.base.util.monitor.monitor import Monitor
from agentuniverse.base.util.system_util import get_project_root_path
from agentuniverse.base.util.logging.logging_util import init_loggers
from agentuniverse.agent_serve.web.request_task import RequestLibrary
@@ -98,6 +99,9 @@ class AgentUniverse(object):
for ext_class in ext_classes:
self.__dynamic_import_and_init(ext_class, configer)
# init monitor module
Monitor(configer=configer)
# scan and register the components
self.__scan_and_register(self.__config_container.app_configer)

View File

@@ -0,0 +1,97 @@
# !/usr/bin/env python3
# -*- coding:utf-8 -*-
# @Time : 2024/6/5 15:33
# @Author : wangchongshi
# @Email : wangchongshi.wcs@antgroup.com
# @FileName: trace.py
import asyncio
import functools
import inspect
from functools import wraps
from agentuniverse.base.util.monitor.monitor import Monitor
from agentuniverse.llm.llm_output import LLMOutput
monitor = Monitor()
def trace_llm(func):
"""Annotation: @trace_llm
Decorator to trace the LLM invocation, add llm input and output to the monitor.
"""
@wraps(func)
async def wrapper_async(*args, **kwargs):
# get llm input from arguments
llm_input = _get_llm_input(func, *args, **kwargs)
# check whether the tracing switch is enabled
self = llm_input.pop('self', None)
if self and hasattr(self, 'tracing'):
if self.tracing is False:
return await func(*args, **kwargs)
# invoke function
result = await func(*args, **kwargs)
# not streaming
if isinstance(result, LLMOutput):
# add llm invocation info to monitor
monitor.trace_llm_invocation(source=func.__qualname__, llm_input=llm_input, llm_output=result.text)
return result
else:
# streaming
async def gen_iterator():
llm_output = []
async for chunk in result:
llm_output.append(chunk.text)
yield chunk
# add llm invocation info to monitor
monitor.trace_llm_invocation(source=func.__qualname__, llm_input=llm_input,
llm_output="".join(llm_output))
return gen_iterator()
@functools.wraps(func)
def wrapper_sync(*args, **kwargs):
# get llm input from arguments
llm_input = _get_llm_input(func, *args, **kwargs)
# check whether the tracing switch is enabled
self = llm_input.pop('self', None)
if self and hasattr(self, 'tracing'):
if not self.tracing:
return func(*args, **kwargs)
# invoke function
result = func(*args, **kwargs)
# not streaming
if isinstance(result, LLMOutput):
# add llm invocation info to monitor
monitor.trace_llm_invocation(source=func.__qualname__, llm_input=llm_input, llm_output=result.text)
return result
else:
# streaming
def gen_iterator():
llm_output = []
for chunk in result:
llm_output.append(chunk.text)
yield chunk
# add llm invocation info to monitor
monitor.trace_llm_invocation(source=func.__qualname__, llm_input=llm_input,
llm_output="".join(llm_output))
return gen_iterator()
if asyncio.iscoroutinefunction(func):
# async function
return wrapper_async
else:
# sync function
return wrapper_sync
def _get_llm_input(func, *args, **kwargs) -> dict:
"""Get the llm input from arguments."""
sig = inspect.signature(func)
bound_args = sig.bind(*args, **kwargs)
bound_args.apply_defaults()
return {k: v for k, v in bound_args.arguments.items()}

View File

@@ -5,14 +5,12 @@
# @Author : jerry.zzw
# @Email : jerry.zzw@antgroup.com
# @FileName: component_manager_base.py
from agentuniverse.base.component.component_base import ComponentBase
from agentuniverse.base.component.component_enum import ComponentEnum
import importlib
import inspect
import pkgutil
import copy
from typing import TypeVar, Generic
from agentuniverse.base.config.application_configer.application_config_manager import ApplicationConfigManager
from agentuniverse.base.component.component_base import ComponentBase
from agentuniverse.base.component.component_enum import ComponentEnum
# 添加类型范型限定
ComponentTypeVar = TypeVar("ComponentTypeVar", bound=ComponentBase)
@@ -40,10 +38,12 @@ class ComponentManagerBase(Generic[ComponentTypeVar]):
self._instance_obj_map.pop(component_instance_name)
def get_instance_obj(self, component_instance_name: str,
appname: str = None) -> ComponentTypeVar:
appname: str = None, new_instance: bool = None) -> ComponentTypeVar:
"""Return the component instance object."""
appname = appname or ApplicationConfigManager().app_configer.base_info_appname
instance_code = f'{appname}.{self._component_type.value.lower()}.{component_instance_name}'
if new_instance:
return copy.deepcopy(self._instance_obj_map.get(instance_code))
return self._instance_obj_map.get(instance_code)
def get_instance_name_list(self) -> list[str]:

View File

@@ -26,6 +26,7 @@ class LLMConfiger(ComponentConfiger):
self.__streaming: Optional[bool] = None
self.__ext_info: Optional[dict] = None
self.__max_context_length: Optional[int] = None
self.__tracing: Optional[bool] = None
@property
def name(self) -> Optional[str]:
@@ -70,6 +71,10 @@ class LLMConfiger(ComponentConfiger):
def max_content_length(self) -> Optional[int]:
return self.__max_context_length
@property
def tracing(self) -> Optional[bool]:
return self.__tracing
def load(self) -> 'LLMConfiger':
"""Load the configuration by the Configer object.
Returns:
@@ -97,6 +102,7 @@ class LLMConfiger(ComponentConfiger):
self.__streaming = configer.value.get('streaming')
self.__ext_info = configer.value.get('ext_info')
self.__max_context_length = configer.value.get('max_context_length')
self.__tracing = configer.value.get('tracing')
except Exception as e:
raise Exception(f"Failed to parse the LLM configuration: {e}")
return self

View File

@@ -0,0 +1,7 @@
# !/usr/bin/env python3
# -*- coding:utf-8 -*-
# @Time : 2024/6/5 15:38
# @Author : wangchongshi
# @Email : wangchongshi.wcs@antgroup.com
# @FileName: __init__.py.py

View File

@@ -0,0 +1,64 @@
# !/usr/bin/env python3
# -*- coding:utf-8 -*-
# @Time : 2024/5/20 16:24
# @Author : wangchongshi
# @Email : wangchongshi.wcs@antgroup.com
# @FileName: monitor.py
import datetime
import json
import os
from typing import Union, Optional
from pydantic import BaseModel
from agentuniverse.base.annotation.singleton import singleton
from agentuniverse.base.config.configer import Configer
LLM_INVOCATION_SUBDIR = "llm_invocation"
@singleton
class Monitor(BaseModel):
dir: Optional[str] = './monitor'
activate: Optional[bool] = False
def __init__(self, configer: Configer = None, **kwargs):
super().__init__(**kwargs)
if configer:
config: dict = configer.value.get('MONITOR', {})
self.dir = config.get('dir', './monitor')
self.activate = config.get('activate', False)
def trace_llm_invocation(self, source: str, llm_input: Union[str, dict], llm_output: Union[str, dict]) -> None:
"""Trace the llm invocation and save it to the monitor jsonl file."""
if self.activate:
try:
import jsonlines
except ImportError:
raise ImportError(
"jsonlines is required to trace llm invocation: `pip install jsonlines`"
)
# get the current time
date = datetime.datetime.now()
llm_invocation = {
"source": source,
"date": date.strftime("%Y-%m-%d %H:%M:%S"),
"llm_input": llm_input,
"llm_output": llm_output,
}
# files are stored in hours
filename = f"llm_{date.strftime('%Y-%m-%d-%H')}.jsonl"
# file path to save
path_save = os.path.join(str(self._get_or_create_subdir(LLM_INVOCATION_SUBDIR)), filename)
# write to jsonl
with jsonlines.open(path_save, 'a') as writer:
json_record = json.dumps(llm_invocation, ensure_ascii=False)
writer.write(json_record)
def _get_or_create_subdir(self, subdir: str) -> str:
"""Get or create a subdirectory if it doesn't exist in the monitor directory."""
path = os.path.join(self.dir, subdir)
os.makedirs(path, exist_ok=True)
return path

View File

@@ -5,14 +5,13 @@
# @Author : weizjajj
# @Email : weizhongjie.wzj@antgroup.com
# @FileName: baichuan_openai_style_llm.py
from typing import Optional, Any, Union, Iterator, AsyncIterator
from typing import Optional
from langchain_community.chat_models import ChatBaichuan
from langchain_core.language_models import BaseLanguageModel
from pydantic import Field
from agentuniverse.base.annotation.trace import trace_llm
from agentuniverse.base.util.env_util import get_from_env
from agentuniverse.llm.llm_output import LLMOutput
from agentuniverse.llm.openai_style_llm import OpenAIStyleLLM
BAICHUAN_Max_CONTEXT_LENGTH = {
@@ -41,3 +40,27 @@ class BAICHUANOpenAIStyleLLM(OpenAIStyleLLM):
if super().max_context_length():
return super().max_context_length()
return BAICHUAN_Max_CONTEXT_LENGTH.get(self.model_name, 8000)
@trace_llm
def call(self, messages: list, **kwargs: Any) -> Union[LLMOutput, Iterator[LLMOutput]]:
""" The call method of the LLM.
Users can customize how the model interacts by overriding call method of the LLM class.
Args:
messages (list): The messages to send to the LLM.
**kwargs: Arbitrary keyword arguments.
"""
return super().call(messages, **kwargs)
@trace_llm
async def acall(self, messages: list, **kwargs: Any) -> Union[LLMOutput, AsyncIterator[LLMOutput]]:
""" The async call method of the LLM.
Users can customize how the model interacts by overriding acall method of the LLM class.
Args:
messages (list): The messages to send to the LLM.
**kwargs: Arbitrary keyword arguments.
"""
return await super().acall(messages, **kwargs)

View File

@@ -5,15 +5,15 @@
# @Author : wangchongshi
# @Email : wangchongshi.wcs@antgroup.com
# @FileName: default_openai_llm.py
from typing import Any, Optional
from typing import Any, Optional, Iterator, Union, AsyncIterator
from pydantic import Field
from agentuniverse.base.annotation.trace import trace_llm
from agentuniverse.base.util.env_util import get_from_env
from agentuniverse.llm.llm_output import LLMOutput
from agentuniverse.llm.openai_style_llm import OpenAIStyleLLM
OPENAI_MAX_CONTEXT_LENGTH = {
"gpt-3.5-turbo": 4096,
"gpt-3.5-turbo-0301": 4096,
@@ -47,7 +47,8 @@ class DefaultOpenAILLM(OpenAIStyleLLM):
api_base: Optional[str] = Field(default_factory=lambda: get_from_env("OPENAI_API_BASE"))
proxy: Optional[str] = Field(default_factory=lambda: get_from_env("OPENAI_PROXY"))
def call(self, messages: list, **kwargs: Any) -> LLMOutput:
@trace_llm
def call(self, messages: list, **kwargs: Any) -> Union[LLMOutput, Iterator[LLMOutput]]:
""" The call method of the LLM.
Users can customize how the model interacts by overriding call method of the LLM class.
@@ -58,7 +59,8 @@ class DefaultOpenAILLM(OpenAIStyleLLM):
"""
return super().call(messages, **kwargs)
async def acall(self, messages: list, **kwargs: Any) -> LLMOutput:
@trace_llm
async def acall(self, messages: list, **kwargs: Any) -> Union[LLMOutput, AsyncIterator[LLMOutput]]:
""" The async call method of the LLM.
Users can customize how the model interacts by overriding acall method of the LLM class.

View File

@@ -5,13 +5,14 @@
# @Author : weizjajj
# @Email : weizhongjie.wzj@antgroup.com
# @FileName: kimi_openai_style_llm.py
from typing import Optional
from typing import Optional, Any, Union, Iterator, AsyncIterator
import requests
from pydantic import Field
from agentuniverse.base.annotation.trace import trace_llm
from agentuniverse.base.util.env_util import get_from_env
from agentuniverse.llm.llm_output import LLMOutput
from agentuniverse.llm.openai_style_llm import OpenAIStyleLLM
KIMI_Max_CONTEXT_LENGTH = {
@@ -33,6 +34,30 @@ class KIMIOpenAIStyleLLM(OpenAIStyleLLM):
proxy: Optional[str] = Field(default_factory=lambda: get_from_env("KIMI_PROXY"))
organization: Optional[str] = Field(default_factory=lambda: get_from_env("KIMI_ORGANIZATION"))
@trace_llm
def call(self, messages: list, **kwargs: Any) -> Union[LLMOutput, Iterator[LLMOutput]]:
""" The call method of the LLM.
Users can customize how the model interacts by overriding call method of the LLM class.
Args:
messages (list): The messages to send to the LLM.
**kwargs: Arbitrary keyword arguments.
"""
return super().call(messages, **kwargs)
@trace_llm
async def acall(self, messages: list, **kwargs: Any) -> Union[LLMOutput, AsyncIterator[LLMOutput]]:
""" The async call method of the LLM.
Users can customize how the model interacts by overriding acall method of the LLM class.
Args:
messages (list): The messages to send to the LLM.
**kwargs: Arbitrary keyword arguments.
"""
return await super().acall(messages, **kwargs)
def max_context_length(self) -> int:
if super().max_context_length():
return super().max_context_length()

View File

@@ -5,13 +5,14 @@
# @Author : weizjajj
# @Email : weizhongjie.wzj@antgroup.com
# @FileName: qwen_openai_style_llm.py
from typing import Optional
from typing import Optional, Any, Union, Iterator, AsyncIterator
from dashscope import get_tokenizer
from pydantic import Field
from agentuniverse.base.annotation.trace import trace_llm
from agentuniverse.base.util.env_util import get_from_env
from agentuniverse.llm.llm_output import LLMOutput
from agentuniverse.llm.openai_style_llm import OpenAIStyleLLM
QWen_Max_CONTEXT_LENGTH = {
@@ -26,7 +27,6 @@ QWen_Max_CONTEXT_LENGTH = {
class QWenOpenAIStyleLLM(OpenAIStyleLLM):
"""
QWen OpenAI style LLM
Args:
@@ -39,6 +39,30 @@ class QWenOpenAIStyleLLM(OpenAIStyleLLM):
proxy: Optional[str] = Field(default_factory=lambda: get_from_env("DASHSCOPE_PROXY"))
organization: Optional[str] = Field(default_factory=lambda: get_from_env("DASHSCOPE_ORGANIZATION"))
@trace_llm
def call(self, messages: list, **kwargs: Any) -> Union[LLMOutput, Iterator[LLMOutput]]:
""" The call method of the LLM.
Users can customize how the model interacts by overriding call method of the LLM class.
Args:
messages (list): The messages to send to the LLM.
**kwargs: Arbitrary keyword arguments.
"""
return super().call(messages, **kwargs)
@trace_llm
async def acall(self, messages: list, **kwargs: Any) -> Union[LLMOutput, AsyncIterator[LLMOutput]]:
""" The async call method of the LLM.
Users can customize how the model interacts by overriding acall method of the LLM class.
Args:
messages (list): The messages to send to the LLM.
**kwargs: Arbitrary keyword arguments.
"""
return await super().acall(messages, **kwargs)
def max_context_length(self) -> int:
if super().max_context_length():
return super().max_context_length()

View File

@@ -5,16 +5,15 @@
# @Author : weizjajj
# @Email : weizhongjie.wzj@antgroup.com
# @FileName: wenxin_llm.py
from typing import Any, Union, AsyncIterator, Iterator
import qianfan
from langchain_community.chat_models import QianfanChatEndpoint
from langchain_core.language_models import BaseLanguageModel
from pydantic import Field
from qianfan import QfResponse
from qianfan.resources.tools import tokenizer
from agentuniverse.base.annotation.trace import trace_llm
from agentuniverse.base.util.env_util import get_from_env
from agentuniverse.llm.llm import LLM
from agentuniverse.llm.llm_output import LLMOutput
@@ -51,6 +50,7 @@ class WenXinLLM(LLM):
"""Create a new Qianfan client."""
return qianfan.ChatCompletion(ak=self.qianfan_ak, sk=self.qianfan_sk)
@trace_llm
def call(self, messages: list, **kwargs: Any) -> Union[LLMOutput, Iterator[LLMOutput]]:
"""Run the OpenAI LLM.
@@ -73,6 +73,7 @@ class WenXinLLM(LLM):
return self.parse_result(chat_completion)
return self.generate_stream_result(chat_completion)
@trace_llm
async def acall(self, messages: list, **kwargs: Any) -> Union[LLMOutput, AsyncIterator[LLMOutput]]:
"""Asynchronously run the OpenAI LLM.

View File

@@ -44,6 +44,7 @@ class LLM(ComponentBase):
max_retries: Optional[int] = 2
streaming: Optional[bool] = False
ext_info: Optional[dict] = None
tracing: Optional[bool] = None
__max_context_length: Optional[int] = None
def __init__(self, **kwargs):
@@ -101,7 +102,7 @@ class LLM(ComponentBase):
self.streaming = component_configer.streaming
if component_configer.ext_info:
self.ext_info = component_configer.ext_info
self.tracing = component_configer.tracing
return self
def set_by_agent_model(self, **kwargs) -> None:
@@ -140,6 +141,3 @@ class LLM(ComponentBase):
Returns:
The integer number of tokens in the text.
"""
class Config:
protected_namespaces = ()

View File

@@ -19,11 +19,12 @@ from agentuniverse.llm.openai_style_langchain_instance import LangchainOpenAISty
class OpenAIStyleLLM(LLM):
"""
This is a wrapper around the OpenAI API that implements a chat interface for the LLM.
"""This is a wrapper around the OpenAI API that implements a chat interface for the LLM.
It uses the `chat` endpoint of the OpenAI API.
It also supports using the `completion` endpoint instead of the `chat` endpoint.
It supports both sync and async modes.
Attributes:
api_key (Optional[str]): The API key to use for authentication.
organization (Optional[str]): The organization ID to use for authentication.

View File

@@ -44,4 +44,8 @@ gunicorn_config_path = './gunicorn_config.toml'
[GRPC]
activate = 'false'
max_workers = 10
server_port = 50051
server_port = 50051
[MONITOR]
activate = true
dir = './monitor'

View File

@@ -13,7 +13,7 @@ from langchain_core.prompts import PromptTemplate
from agentuniverse.agent.memory.chat_memory import ChatMemory
from agentuniverse.agent.memory.enum import MemoryTypeEnum
from agentuniverse.agent.memory.message import Message
from agentuniverse.llm.openai_llm import OpenAILLM
from agentuniverse.llm.default.default_openai_llm import DefaultOpenAILLM
template = """
You are a chatbot having a conversation with a human.
@@ -33,12 +33,12 @@ class MemoryTest(unittest.TestCase):
def setUp(self) -> None:
init_params = dict()
init_params['model_name'] = 'gpt-3.5-turbo'
init_params['model_name'] = 'gpt-4o'
init_params['temperature'] = 0.7
init_params['max_retries'] = 2
init_params['streaming'] = False
init_params['llm'] = OpenAILLM(**init_params)
init_params['llm'] = DefaultOpenAILLM(**init_params)
init_params['memory_key'] = 'chat_history'
init_params['max_tokens'] = '1024'
init_params['type'] = MemoryTypeEnum.LONG_TERM

View File

@@ -9,7 +9,7 @@ import unittest
from langchain.chains import ConversationChain
from agentuniverse.llm.openai_llm import OpenAILLM
from agentuniverse.llm.default.default_openai_llm import DefaultOpenAILLM
class LLMTest(unittest.TestCase):
@@ -18,7 +18,7 @@ class LLMTest(unittest.TestCase):
"""
def setUp(self) -> None:
self.llm = OpenAILLM(model_name='gpt-4o')
self.llm = DefaultOpenAILLM(model_name='gpt-4o')
def test_call(self) -> None:
messages = [