22 KiB
如何自定义模型LLM组件
根据agentUniverse领域组件的设计特性,创建一个模型LLM定义由2部分组成:
- llm_xx.yaml
- llm_xx.py
其中llm_xx.yaml必须创建, 其包含了LLM组件的名称、描述、模型名等重要属性信息;llm_xx.py为按需创建,其包含了llm组件的特定行为并且支持了用户对llm进行标准的行为自定义注入。理解这一原理后,让我们具体看看该如何创建这两部分内容。
创建LLM配置 - llm_xx.yaml
我们将详细介绍配置中的各组成部分。
设置LLM的基本属性
name: LLM名称,LLM实例名称,您可以按照自己的期望设置任何名字description: LLM描述, 按照您的实际需求填写model_name: 接入的LLM模型官方名称,例如OpenAi系列中的gpt-4o、gpt-3.5-turbo等max_retries: LLM访问的最大重试次数max_tokens: LLM模型实例支持的最大token数量,该属性必须小于官方提供的model_name可处理token的最大值
设置LLM组件元信息
metadata - 组件元信息
type: 组件类型,'LLM'module: LLM实体包路径class: LLM实体类名
所有已经提供的LLM组件都将提供对应的module、class信息,复制到该部分即可。该部分会将LLM组件的所有配置与其行为组成一个整体,若您定义了LLM的标准行为,则需要按照实际路径填写这部分,我们将在后文中的从已有的LLM对象创建中进一步说明。
一个LLM定义配置的实际样例
name: 'demo_llm'
description: 'demo openai'
model_name: 'gpt-3.5-turbo'
max_tokens: 1000
max_retries: 2
metadata:
type: 'LLM'
module: 'agentuniverse.llm.default.default_openai_llm'
class: 'DefaultOpenAILLM'
上述是一个实际的LLM配置的样例。除了上述介绍的标准配置项,您可以在我们的样例工程中的sample_standard_app.intelligence.agentic.llm路径下更多的LLM配置yaml样例。
除此之外,agentuniverse不限制用户对LLM yaml配置内容进行扩展,您可以根据自己的要求创建任何自定义配置key,但请注意不要与上述默认的配置关键字重名。
创建LLM领域行为定义 - llm_xx.py
在本部分您可以对任何LLM的行为进行扩展,当然如果您完全使用已有的LLM能力那么本部分并非必须。
在本节我们将重点介绍常用的LLM的行为定义与您可能在实际LLM行为定义过程中使用的常用技巧。
创建LLM类对象
创建对应的LLM类对象并继承agentUniverse框架LLM基础类 LLM;
定制对应的LLM领域行为
常用可定制的LLM行为如下。
_new_client方法
我们知道非常多的模型服务提供了标准的client sdk,在这个方法中您可以将LLM服务官方提供的client进行注入,如果您使用的模型并没有提供标准的client这一步是非必需的。
def _new_client(self):
"""Initialize the client."""
pass
_new_async_client方法
同_new_client方法,在这个方法中您可以将LLM服务官方提供的异步client进行注入,如果您使用的模型并没有提供标准的异步client这一步是非必需的。
def _new_async_client(self):
"""Initialize the async client."""
pass
call和acall方法
LLM基础类提供的call(同步调用)和acall(异步调用)两个抽象方法,用户可以通过实现call和acall方法自定义模型调用方式。 其标准定义如下:
from abc import abstractmethod
from typing import Optional, Any, AsyncIterator, Iterator, Union
from agentuniverse.llm.llm_output import LLMOutput
@abstractmethod
def call(self, *args: Any, **kwargs: Any) -> Union[LLMOutput, Iterator[LLMOutput]]:
"""Run the LLM."""
@abstractmethod
async def acall(self, *args: Any, **kwargs: Any) -> Union[LLMOutput, AsyncIterator[LLMOutput]]:
"""Asynchronously run the LLM."""
as_langchain方法
agentUniverse在底层使用了langchain的能力,agentUniverse兼容使用langchain的LLM的定义方式,若您的项目已使用过langchain,那么在本方法中只需要将Langchain框架BaseLanguageModel基础模型放入,即可将agentUniverse与langchain融合使用。agentUniverse在可支持与Langchain、Semantic Kernel等任何类似的orchestration框架融合,但是目前我们重点关注langchain。
from langchain_core.language_models.base import BaseLanguageModel
def as_langchain(self) -> BaseLanguageModel:
"""Convert to the langchain llm class."""
pass
max_context_length方法
定义LLM模型对应的上下文长度,如GPT-4 max_context_length方法返回8192,GPT-3.5-turbo max_context_length返回4096,您可以在模型的官方文档里找到这个数字。
get_num_tokens方法
get_num_tokens定义模型的编码方式及token计算方式,传入数据,输出数据对应的token数量。
一个实际的LLM领域行为定义样例
以Open系列LLM定义为例,OpenAILLM.py
from typing import Any, Optional, AsyncIterator, Iterator, Union
import httpx
from langchain_core.language_models.base import BaseLanguageModel
from openai import OpenAI, AsyncOpenAI
from pydantic import Field
import tiktoken
from agentuniverse.llm.langchain_instance import LangchainOpenAI
from agentuniverse.llm.llm import LLM, LLMOutput
from agentuniverse.base.util.env_util import get_from_env
OPENAI_MAX_CONTEXT_LENGTH = {
"gpt-3.5-turbo": 4096,
"gpt-3.5-turbo-0301": 4096,
"gpt-3.5-turbo-0613": 4096,
"gpt-3.5-turbo-16k": 16384,
"gpt-3.5-turbo-16k-0613": 16384,
"gpt-35-turbo": 4096,
"gpt-35-turbo-16k": 16384,
"gpt-3.5-turbo-1106": 16384,
"gpt-3.5-turbo-0125": 16384,
"gpt-4-0314": 8192,
"gpt-4": 8192,
"gpt-4-32k": 32768,
"gpt-4-32k-0613": 32768,
"gpt-4-0613": 8192,
"gpt-4-1106-preview": 128000,
"gpt-4-turbo": 128000,
"gpt-4o": 128000,
"gpt-4o-2024-05-13": 128000,
}
class OpenAILLM(LLM):
"""The openai llm class.
Attributes:
openai_api_key (Optional[str], optional): The API key for the OpenAI API.
This automatically infers the `openai_api_key` from the environment variable `OPENAI_API_KEY` if not provided.
openai_organization (Optional[str], optional): The OpenAI organization.
This automatically infers the `openai_organization` from the environment variable `OPENAI_ORGANIZATION` if not provided.
openai_api_base (Optional[str], optional): The OpenAI base url.
This automatically infers the `openai_api_base` from the environment variable `OPENAI_API_BASE` if not provided.
openai_client_args (Optional[dict], optional): Additional arguments to pass to the OpenAI client.
"""
openai_api_key: Optional[str] = Field(default_factory=lambda: get_from_env("OPENAI_API_KEY"))
openai_organization: Optional[str] = Field(default_factory=lambda: get_from_env("OPENAI_ORGANIZATION"))
openai_api_base: Optional[str] = Field(default_factory=lambda: get_from_env("OPENAI_API_BASE"))
openai_proxy: Optional[str] = Field(default_factory=lambda: get_from_env("OPENAI_PROXY"))
openai_client_args: Optional[dict] = None
def _new_client(self):
"""Initialize the openai client."""
return OpenAI(
api_key=self.openai_api_key,
organization=self.openai_organization,
base_url=self.openai_api_base,
timeout=self.request_timeout,
max_retries=self.max_retries,
http_client=httpx.Client(proxy=self.openai_proxy) if self.openai_proxy else None,
**(self.openai_client_args or {}),
)
def _new_async_client(self):
"""Initialize the openai async client."""
return AsyncOpenAI(
api_key=self.openai_api_key,
organization=self.openai_organization,
base_url=self.openai_api_base,
timeout=self.request_timeout,
max_retries=self.max_retries,
http_client=httpx.AsyncClient(proxy=self.openai_proxy) if self.openai_proxy else None,
**(self.openai_client_args or {}),
)
def call(self, messages: list, **kwargs: Any) -> Union[LLMOutput, Iterator[LLMOutput]]:
"""Run the OpenAI LLM.
Args:
messages (list): The messages to send to the LLM.
**kwargs: Arbitrary keyword arguments.
"""
streaming = kwargs.pop("streaming") if "streaming" in kwargs else self.streaming
self.client = self._new_client()
with self.client as client:
chat_completion = client.chat.completions.create(
messages=messages,
model=kwargs.pop('model', self.model_name),
temperature=kwargs.pop('temperature', self.temperature),
stream=kwargs.pop('stream', streaming),
max_tokens=kwargs.pop('max_tokens', self.max_tokens),
**kwargs,
)
if not streaming:
text = chat_completion.choices[0].message.content
return LLMOutput(text=text, raw=chat_completion.model_dump())
return self.generate_stream_result(chat_completion)
async def acall(self, messages: list, **kwargs: Any) -> Union[LLMOutput, AsyncIterator[LLMOutput]]:
"""Asynchronously run the OpenAI LLM.
Args:
messages (list): The messages to send to the LLM.
**kwargs: Arbitrary keyword arguments.
"""
streaming = kwargs.pop("streaming") if "streaming" in kwargs else self.streaming
self.async_client = self._new_async_client()
async with self.async_client as async_client:
chat_completion = await async_client.chat.completions.create(
messages=messages,
model=kwargs.pop('model', self.model_name),
temperature=kwargs.pop('temperature', self.temperature),
stream=kwargs.pop('stream', streaming),
max_tokens=kwargs.pop('max_tokens', self.max_tokens),
**kwargs,
)
if not streaming:
text = chat_completion.choices[0].message.content
return LLMOutput(text=text, raw=chat_completion.model_dump())
return self.agenerate_stream_result(chat_completion)
def as_langchain(self) -> BaseLanguageModel:
"""Convert the agentUniverse(AU) openai llm class to the langchain openai llm class."""
return LangchainOpenAI(self)
def max_context_length(self) -> int:
"""Max context length.
The total length of input tokens and generated tokens is limited by the openai model's context length.
"""
return OPENAI_MAX_CONTEXT_LENGTH.get(self.model_name, 4096)
def get_num_tokens(self, text: str) -> int:
"""Get the number of tokens present in the text.
Useful for checking if an input will fit in an openai model's context window.
Args:
text: The string input to tokenize.
Returns:
The integer number of tokens in the text.
"""
try:
encoding = tiktoken.encoding_for_model(self.model_name)
except KeyError:
encoding = tiktoken.get_encoding("cl100k_base")
return len(encoding.encode(text))
@staticmethod
def parse_result(chunk):
"""Generate the result of the stream."""
chat_completion = chunk
if not isinstance(chunk, dict):
chunk = chunk.dict()
if len(chunk["choices"]) == 0:
return
choice = chunk["choices"][0]
message = choice.get("delta")
text = message.get("content")
if not text:
return
return LLMOutput(text=text, raw=chat_completion.model_dump())
@classmethod
def generate_stream_result(cls, stream: Iterator) -> Iterator[LLMOutput]:
"""Generate the result of the stream."""
for chunk in stream:
llm_output = cls.parse_result(chunk)
if llm_output:
yield llm_output
@classmethod
async def agenerate_stream_result(cls, stream: AsyncIterator) -> AsyncIterator[LLMOutput]:
"""Generate the result of the stream."""
async for chunk in stream:
llm_output = cls.parse_result(chunk)
if llm_output:
yield llm_output
在上述的样例中,我们通过_new_client、_new_async_client方法接入了OpenAi标准的client sdk,实现call与acall实现OpenAi标准同步、异步调用方式,通过在as_langchain中引入LangchainOpenAI对象完成与langchain框架的融合。
LangchainOpenAI.py 为langchain的BaseLanguageModel对象,可关注下列例子。
from typing import Any, List, Optional, AsyncIterator
from langchain.callbacks.manager import AsyncCallbackManagerForLLMRun, CallbackManagerForLLMRun
from langchain.chat_models import ChatOpenAI
from langchain.schema import BaseMessage, ChatResult
from langchain_community.chat_models.openai import _convert_delta_to_message_chunk
from langchain_core.language_models.chat_models import generate_from_stream, agenerate_from_stream
from langchain_core.messages import AIMessageChunk
from langchain_core.outputs import ChatGenerationChunk
from agentuniverse.llm.llm import LLM
class LangchainOpenAI(ChatOpenAI):
"""Langchain OpenAI LLM wrapper."""
llm: Optional[LLM] = None
def __init__(self, llm: LLM):
"""The __init__ method.
The agentUniverse LLM instance is passed to this class as an argument.
Convert the attributes of agentUniverse(AU) LLM instance to the LangchainOpenAI object for initialization
Args:
llm (LLM): the agentUniverse(AU) LLM instance.
"""
init_params = dict()
init_params['model_name'] = llm.model_name if llm.model_name else 'gpt-3.5-turbo'
init_params['temperature'] = llm.temperature if llm.temperature else 0.7
init_params['request_timeout'] = llm.request_timeout
init_params['max_tokens'] = llm.max_tokens
init_params['max_retries'] = llm.max_retries if llm.max_retries else 2
init_params['streaming'] = llm.streaming if llm.streaming else False
init_params['openai_api_key'] = llm.openai_api_key if llm.openai_api_key else 'blank'
init_params['openai_organization'] = llm.openai_organization
init_params['openai_api_base'] = llm.openai_api_base
init_params['openai_proxy'] = llm.openai_proxy
super().__init__(**init_params)
self.llm = llm
def _generate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
stream: Optional[bool] = None,
**kwargs,
) -> ChatResult:
"""Run the Langchain OpenAI LLM."""
should_stream = stream if stream is not None else self.streaming
message_dicts, params = self._create_message_dicts(messages, stop)
params = {**params, **kwargs}
llm_output = self.llm.call(messages=message_dicts, **params)
if not should_stream:
return self._create_chat_result(llm_output.raw)
stream_iter = self.as_langchain_chunk(llm_output)
return generate_from_stream(stream_iter)
async def _agenerate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
stream: Optional[bool] = None,
**kwargs: Any,
) -> ChatResult:
"""Asynchronously run the Langchain OpenAI LLM."""
should_stream = stream if stream is not None else self.streaming
message_dicts, params = self._create_message_dicts(messages, stop)
params = {**params, **kwargs}
llm_output = await self.llm.acall(messages=message_dicts, **params)
if not should_stream:
return self._create_chat_result(llm_output.raw)
stream_iter = self.as_langchain_achunk(llm_output)
return await agenerate_from_stream(stream_iter)
@staticmethod
def as_langchain_chunk(stream, run_manager=None):
default_chunk_class = AIMessageChunk
for llm_result in stream:
chunk = llm_result.raw
if not isinstance(chunk, dict):
chunk = chunk.dict()
if len(chunk["choices"]) == 0:
continue
choice = chunk["choices"][0]
chunk = _convert_delta_to_message_chunk(
choice["delta"], default_chunk_class
)
finish_reason = choice.get("finish_reason")
generation_info = (
dict(finish_reason=finish_reason) if finish_reason is not None else None
)
default_chunk_class = chunk.__class__
chunk = ChatGenerationChunk(message=chunk, generation_info=generation_info)
yield chunk
if run_manager:
run_manager.on_llm_new_token(chunk.text, chunk=chunk)
@staticmethod
async def as_langchain_achunk(stream_iterator: AsyncIterator, run_manager=None) \
-> AsyncIterator[ChatGenerationChunk]:
default_chunk_class = AIMessageChunk
async for llm_result in stream_iterator:
chunk = llm_result.raw
if not isinstance(chunk, dict):
chunk = chunk.dict()
if len(chunk["choices"]) == 0:
continue
choice = chunk["choices"][0]
chunk = _convert_delta_to_message_chunk(
choice["delta"], default_chunk_class
)
finish_reason = choice.get("finish_reason")
generation_info = (
dict(finish_reason=finish_reason) if finish_reason is not None else None
)
default_chunk_class = chunk.__class__
chunk = ChatGenerationChunk(message=chunk, generation_info=generation_info)
yield chunk
if run_manager:
await run_manager.on_llm_new_token(token=chunk.text, chunk=chunk)
在agentUniverse与langchain的融合对象中,您只需要重构langchain的LLM模型生成_generate与_agenerate方法即可,其中重点使用了self.llm.call与self.llm.acall方法,其他部分我们在后文的实用技巧中会提及在此不额外说明。
关注您定义的LLM所在的包路径
通过上面的LLM配置与领域定义部分,您已经掌握了LLM定义创建的所有步骤;接下去我们将使用这些LLM,在使用前请关注创建的LLM是否在正确的包扫描路径内。
在agentUniverse项目的config.toml中需要配置LLM文件对应的package, 请再次确认您创建的文件所在的包路径是否在CORE_PACKAGE中llm路径或其子路径下。
以示例工程中的配置为例,如下:
[CORE_PACKAGE]
# Scan and register llm components for all paths under this list, with priority over the default.
llm = ['sample_standard_app.intelligence.agentic.llm']
关注您使用的LLM关联的api_key
模型的api_key基础参数,以openai为例如openai_api_key、openai_organization、openai_base、openai_proxy等信息,您可以将信息配置在系统环境变量,模型对象在创建初始化实例过程中会读取对应系统环境变量,实现参数装配;此外您也可以将密钥等信息配置在agentUniverse的custom_key.toml中(我们推荐您将这类配置放置在工程外或在.gitignore中去除跟踪,以避免关键信息被git跟踪而泄漏),agentUniverse系统初始化过程中,会自动读取配置文件,将配置信息自动注册到系统环境变量中。
下面是一个实际的custom_key.toml配置:
# Example file of custom_key.toml. Rename to custom_key.toml while using.
[KEY_LIST]
# Perform a full component scan and registration for all the paths under this list.
SERPER_API_KEY='xxx'
OPENAI_API_KEY='xxx'
模型LLM组件创建的其他技巧
加入LLM流式能力
在定制对应的LLM领域行为的样例中已经提供了流式能力的能力,您可以关注call与acall中的generate_stream_result与agenerate_stream_result方法。
如果您使用了Langchain对应的LLM类,需要在_generate与_agenerate中实现流式返回,可重点关注样例LangchainOpenAI.py中的as_langchain_chunk与as_langchain_achunk方法。
从已有的LLM对象创建
agentUniverse允许从已有的LLM组件对象中快速创建LLM实例。 如OpenAI的LLM系列已经由框架提供,其标准的元信息如下:
module: 'agentuniverse.llm.default.default_openai_llm'
class: 'DefaultOpenAILLM'
假如我们需要配置定义一个基于gpt-3.5-turbo模型,最大token限制为1000,失败重试次数为2次的LLM实例,其配置如下:
name: 'demo_llm'
description: 'demo openai'
model_name: 'gpt-3.5-turbo'
max_tokens: 1000
max_retries: 2
metadata:
type: 'LLM'
module: 'agentuniverse.llm.default.default_openai_llm'
class: 'DefaultOpenAILLM'
您可以在本文的了解更多已有模型LLM组件部分查询更多的LLM元信息。
如何使用模型LLM组件
在Agent中配置使用
您可以根据智能体创建与使用中的内容在agent的llm_model中设置您创建的任意LLM。
可参考示例:demo_multillm_agent, 具体文件路径为 sample_standard_app/intelligence/agentic/agent/agent_instance/rag_agent_case/demo_multillm_agent.yaml。
使用LLM管理器
通过LLM管理器中的.get_instance_obj(xx_llm_name) 方法可以获取对应名称的LLM实例。
from agentuniverse.llm.llm import LLM
from agentuniverse.llm.llm_manager import LLMManager
llm: LLM = LLMManager().get_instance_obj('llm_name')
了解更多已有模型LLM组件
框架提供的更多LLM组件在agentuniverse.llm.default包路径下,您可以进一步查看对应代码或在我们的扩展组件介绍部分进一步了解他们,更多已经接入的模型您可以在模型列表章节中了解。
总结
至此您已经掌握了模型LLM的定义与使用,赶快去尝试定义与使用LLM吧。