diff --git a/agentuniverse/base/util/logging/general_logger.py b/agentuniverse/base/util/logging/general_logger.py index ac05e688..cc24c528 100644 --- a/agentuniverse/base/util/logging/general_logger.py +++ b/agentuniverse/base/util/logging/general_logger.py @@ -167,6 +167,10 @@ class GeneralLogger(Logger): context_prefix=get_context_prefix() ).warning(msg, *args, **kwargs) + async def awarn(self, msg, *args, **kwargs): + self.warn(msg, *args, **kwargs) + await self._logger.complete() + def info(self, msg, *args, **kwargs): self._logger.opt(depth=self.get_inheritance_depth()).bind( log_type=LogTypeEnum.default, @@ -174,6 +178,10 @@ class GeneralLogger(Logger): context_prefix=get_context_prefix() ).info(msg, *args, **kwargs) + async def ainfo(self, msg, *args, **kwargs): + self.info(msg, *args, **kwargs) + await self._logger.complete() + def error(self, msg, *args, **kwargs): self._logger.opt(depth=self.get_inheritance_depth()).bind( log_type=LogTypeEnum.default, @@ -181,6 +189,10 @@ class GeneralLogger(Logger): context_prefix=get_context_prefix() ).error(msg, *args, **kwargs) + async def aerror(self, msg, *args, **kwargs): + self.error(msg, *args, **kwargs) + await self._logger.complete() + def critical(self, msg, *args, **kwargs): self._logger.opt(depth=self.get_inheritance_depth()).bind( log_type=LogTypeEnum.default, @@ -188,6 +200,10 @@ class GeneralLogger(Logger): context_prefix=get_context_prefix() ).critical(msg, *args, **kwargs) + async def acritical(self, msg, *args, **kwargs): + self.error(msg, *args, **kwargs) + await self._logger.complete() + def trace(self, msg, *args, **kwargs): self._logger.opt(depth=self.get_inheritance_depth()).bind( log_type=LogTypeEnum.default, @@ -195,6 +211,10 @@ class GeneralLogger(Logger): context_prefix=get_context_prefix() ).trace(msg, *args, **kwargs) + async def atrace(self, msg, *args, **kwargs): + self.trace(msg, *args, **kwargs) + await self._logger.complete() + def debug(self, msg, *args, **kwargs): self._logger.opt(depth=self.get_inheritance_depth()).bind( log_type=LogTypeEnum.default, @@ -202,6 +222,10 @@ class GeneralLogger(Logger): context_prefix=get_context_prefix() ).debug(msg, *args, **kwargs) + async def adebug(self, msg, *args, **kwargs): + self.debug(msg, *args, **kwargs) + await self._logger.complete() + def _add_handler(self, log_level: LOG_LEVEL = "INFO"): """Add a new loguru log handler, use instance module name to filter out message recorded by this instance. diff --git a/agentuniverse/base/util/logging/logging_util.py b/agentuniverse/base/util/logging/logging_util.py index edc92f42..134a00f0 100644 --- a/agentuniverse/base/util/logging/logging_util.py +++ b/agentuniverse/base/util/logging/logging_util.py @@ -11,6 +11,7 @@ import sys from typing import Optional from pathlib import Path from typing_extensions import deprecated +import asyncio import loguru @@ -122,6 +123,30 @@ def _add_sls_log_handler(): ) +def _add_sls_log_async_handler(): + """Add a handler to record all """ + from agentuniverse_extension.logger.sls_sink import AsyncSlsSink, AsyncSlsSender + sls_sender = AsyncSlsSender(LoggingConfig.sls_project, + LoggingConfig.sls_log_store, + LoggingConfig.sls_endpoint, + LoggingConfig.access_key_id, + LoggingConfig.access_key_secret, + LoggingConfig.sls_log_queue_max_size, + LoggingConfig.sls_log_send_interval) + loop = asyncio.get_event_loop_policy().get_event_loop() + loop.create_task(sls_sender.start()) + + def _sls_filter(record): + return record["extra"].get('log_type') == LogTypeEnum.default or record["extra"].get('log_type') == LogTypeEnum.sls + loguru.logger.add( + sink=AsyncSlsSink(sls_sender), + format=LoggingConfig.log_format, + filter=_sls_filter, + level=LoggingConfig.log_level, + enqueue=False + ) + + def get_module_logger(module_name: str, log_level: Optional[LOG_LEVEL] = None) -> GeneralLogger: """Get a module dedicated logger. @@ -196,6 +221,14 @@ def add_sink(sink, log_level: Optional[LOG_LEVEL] = None) -> bool: return False +def is_in_coroutine_context(): + try: + asyncio.current_task() + return True + except RuntimeError: + return False + + def init_loggers(config_path: Optional[str] = None): """Parse config and initialize all loggers and handlers. @@ -208,5 +241,8 @@ def init_loggers(config_path: Optional[str] = None): _add_std_out_handler() _add_error_log_handler() if LoggingConfig.log_extend_module_switch["sls_log"]: - _add_sls_log_handler() + if is_in_coroutine_context(): + _add_sls_log_async_handler() + else: + _add_sls_log_handler() _add_standard_logger() diff --git a/agentuniverse_extension/logger/sls_sink.py b/agentuniverse_extension/logger/sls_sink.py index 815e6f81..01e30c9a 100644 --- a/agentuniverse_extension/logger/sls_sink.py +++ b/agentuniverse_extension/logger/sls_sink.py @@ -6,16 +6,123 @@ # @Email : fanen.lhy@antgroup.com # @FileName: sls_sink.py +import asyncio import queue import threading import time +from concurrent.futures import ThreadPoolExecutor from typing import List, Optional +import loguru from aliyun.log.logclient import LogClient from aliyun.log.logitem import LogItem from aliyun.log.putlogsrequest import PutLogsRequest from aliyun.log.putlogsresponse import PutLogsResponse -import loguru +from loguru import logger + + +class AsyncSlsSender: + def __init__( + self, + project: str, + log_store: str, + endpoint: str, + access_key_id: str, + access_key_secret: str, + queue_max_size: int = 10_000, + send_interval: float = 2.0, + batch_size: int = 256, + max_workers: int = 2, + loop: Optional[asyncio.AbstractEventLoop] = None, + ): + self.project = project + self.log_store = log_store + self._client = LogClient(endpoint, access_key_id, access_key_secret) + + self._queue: asyncio.Queue[LogItem] = asyncio.Queue(queue_max_size) + self._send_interval = send_interval + self._batch_size = batch_size + + self._loop = loop or asyncio.get_event_loop() + self._executor = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="sls_uploader") + + self._bg_task: Optional[asyncio.Task] = None + self._shutdown = asyncio.Event() + + # ---------- public API ---------- + async def start(self) -> None: + """启动后台批量发送任务(幂等)""" + if self._bg_task is None or self._bg_task.done(): + self._bg_task = self._loop.create_task(self._worker()) + + async def put(self, item: LogItem, /) -> None: + """异步放入队列;满了直接丢弃(不阻塞业务协程)""" + try: + self._queue.put_nowait(item) + except asyncio.QueueFull: + logger.error("SLS log queue full – drop a log item") + + async def aclose(self, timeout: float | None = 5.0) -> None: + """ + 优雅关闭: + - 停止后台任务 + - Flush 剩余日志 + - 关闭线程池 + """ + if self._bg_task and not self._bg_task.done(): + self._shutdown.set() + # 等待 task 结束;到时仍未结束就取消 + try: + await asyncio.wait_for(self._bg_task, timeout) + except asyncio.TimeoutError: + self._bg_task.cancel() + + self._executor.shutdown(wait=True) + + # ---------- internal ---------- + async def _worker(self) -> None: + """后台循环:批量收集 → 发送""" + try: + while not self._shutdown.is_set(): + await self._flush_once() + try: + # 等待下一轮或被提前唤醒 + await asyncio.wait_for(self._shutdown.wait(), timeout=self._send_interval) + except asyncio.TimeoutError: + pass + # 程序即将退出,把残余也发送掉 + await self._drain_queue() + except asyncio.CancelledError: + pass # 避免 noisy 日志 + + async def _flush_once(self) -> None: + """取最多 batch_size 条日志并上传""" + if self._queue.empty(): + return + + items: List[LogItem] = [] + while items.__len__() < self._batch_size and not self._queue.empty(): + try: + items.append(self._queue.get_nowait()) + except asyncio.QueueEmpty: + break + + if items: + await self._upload(items) + + async def _drain_queue(self) -> None: + """flush 全部剩余日志""" + while not self._queue.empty(): + await self._flush_once() + + async def _upload(self, items: List[LogItem]) -> Optional[PutLogsResponse]: + """把同步 put_logs 扔进线程池""" + def _blocking_upload() -> PutLogsResponse: + req = PutLogsRequest(self.project, self.log_store, "", "", items) + return self._client.put_logs(req) + + return await self._loop.run_in_executor(self._executor, _blocking_upload) + class SlsSender: @@ -83,8 +190,7 @@ class SlsSender: source, log_item_list) put_response = self.client.put_logs(put_request) except Exception as e: - self._logger.error( - f"send single log to sls failed: {str(e)}", ) + print(f"send single log to sls failed: {str(e)}") return None return put_response @@ -183,6 +289,21 @@ class SlsSender: time.sleep(self.send_interval) +# ------------------------- Loguru sink ------------------------- # +class AsyncSlsSink: + + def __init__(self, sender: AsyncSlsSender): + self._sender = sender + + async def __call__(self, message): + record = message.record + item = LogItem( + contents=[("content", message)], + timestamp=int(record["time"].timestamp()) + ) + await self._sender.put(item) + + class SlsSink: """A custom loguru sink used to send logs to aliyun sls."""