Feature: LOGGER support async method, add sls async sink

This commit is contained in:
AniviaTn
2025-06-13 11:01:12 +08:00
parent 3a3a880319
commit 1fe867f012
3 changed files with 185 additions and 4 deletions

View File

@@ -167,6 +167,10 @@ class GeneralLogger(Logger):
context_prefix=get_context_prefix()
).warning(msg, *args, **kwargs)
async def awarn(self, msg, *args, **kwargs):
self.warn(msg, *args, **kwargs)
await self._logger.complete()
def info(self, msg, *args, **kwargs):
self._logger.opt(depth=self.get_inheritance_depth()).bind(
log_type=LogTypeEnum.default,
@@ -174,6 +178,10 @@ class GeneralLogger(Logger):
context_prefix=get_context_prefix()
).info(msg, *args, **kwargs)
async def ainfo(self, msg, *args, **kwargs):
self.info(msg, *args, **kwargs)
await self._logger.complete()
def error(self, msg, *args, **kwargs):
self._logger.opt(depth=self.get_inheritance_depth()).bind(
log_type=LogTypeEnum.default,
@@ -181,6 +189,10 @@ class GeneralLogger(Logger):
context_prefix=get_context_prefix()
).error(msg, *args, **kwargs)
async def aerror(self, msg, *args, **kwargs):
self.error(msg, *args, **kwargs)
await self._logger.complete()
def critical(self, msg, *args, **kwargs):
self._logger.opt(depth=self.get_inheritance_depth()).bind(
log_type=LogTypeEnum.default,
@@ -188,6 +200,10 @@ class GeneralLogger(Logger):
context_prefix=get_context_prefix()
).critical(msg, *args, **kwargs)
async def acritical(self, msg, *args, **kwargs):
self.error(msg, *args, **kwargs)
await self._logger.complete()
def trace(self, msg, *args, **kwargs):
self._logger.opt(depth=self.get_inheritance_depth()).bind(
log_type=LogTypeEnum.default,
@@ -195,6 +211,10 @@ class GeneralLogger(Logger):
context_prefix=get_context_prefix()
).trace(msg, *args, **kwargs)
async def atrace(self, msg, *args, **kwargs):
self.trace(msg, *args, **kwargs)
await self._logger.complete()
def debug(self, msg, *args, **kwargs):
self._logger.opt(depth=self.get_inheritance_depth()).bind(
log_type=LogTypeEnum.default,
@@ -202,6 +222,10 @@ class GeneralLogger(Logger):
context_prefix=get_context_prefix()
).debug(msg, *args, **kwargs)
async def adebug(self, msg, *args, **kwargs):
self.debug(msg, *args, **kwargs)
await self._logger.complete()
def _add_handler(self, log_level: LOG_LEVEL = "INFO"):
"""Add a new loguru log handler, use instance module name to filter out
message recorded by this instance.

View File

@@ -11,6 +11,7 @@ import sys
from typing import Optional
from pathlib import Path
from typing_extensions import deprecated
import asyncio
import loguru
@@ -122,6 +123,30 @@ def _add_sls_log_handler():
)
def _add_sls_log_async_handler():
"""Add a handler to record all """
from agentuniverse_extension.logger.sls_sink import AsyncSlsSink, AsyncSlsSender
sls_sender = AsyncSlsSender(LoggingConfig.sls_project,
LoggingConfig.sls_log_store,
LoggingConfig.sls_endpoint,
LoggingConfig.access_key_id,
LoggingConfig.access_key_secret,
LoggingConfig.sls_log_queue_max_size,
LoggingConfig.sls_log_send_interval)
loop = asyncio.get_event_loop_policy().get_event_loop()
loop.create_task(sls_sender.start())
def _sls_filter(record):
return record["extra"].get('log_type') == LogTypeEnum.default or record["extra"].get('log_type') == LogTypeEnum.sls
loguru.logger.add(
sink=AsyncSlsSink(sls_sender),
format=LoggingConfig.log_format,
filter=_sls_filter,
level=LoggingConfig.log_level,
enqueue=False
)
def get_module_logger(module_name: str,
log_level: Optional[LOG_LEVEL] = None) -> GeneralLogger:
"""Get a module dedicated logger.
@@ -196,6 +221,14 @@ def add_sink(sink, log_level: Optional[LOG_LEVEL] = None) -> bool:
return False
def is_in_coroutine_context():
try:
asyncio.current_task()
return True
except RuntimeError:
return False
def init_loggers(config_path: Optional[str] = None):
"""Parse config and initialize all loggers and handlers.
@@ -208,5 +241,8 @@ def init_loggers(config_path: Optional[str] = None):
_add_std_out_handler()
_add_error_log_handler()
if LoggingConfig.log_extend_module_switch["sls_log"]:
_add_sls_log_handler()
if is_in_coroutine_context():
_add_sls_log_async_handler()
else:
_add_sls_log_handler()
_add_standard_logger()

View File

@@ -6,16 +6,123 @@
# @Email : fanen.lhy@antgroup.com
# @FileName: sls_sink.py
import asyncio
import queue
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from typing import List, Optional
import loguru
from aliyun.log.logclient import LogClient
from aliyun.log.logitem import LogItem
from aliyun.log.putlogsrequest import PutLogsRequest
from aliyun.log.putlogsresponse import PutLogsResponse
import loguru
from loguru import logger
class AsyncSlsSender:
def __init__(
self,
project: str,
log_store: str,
endpoint: str,
access_key_id: str,
access_key_secret: str,
queue_max_size: int = 10_000,
send_interval: float = 2.0,
batch_size: int = 256,
max_workers: int = 2,
loop: Optional[asyncio.AbstractEventLoop] = None,
):
self.project = project
self.log_store = log_store
self._client = LogClient(endpoint, access_key_id, access_key_secret)
self._queue: asyncio.Queue[LogItem] = asyncio.Queue(queue_max_size)
self._send_interval = send_interval
self._batch_size = batch_size
self._loop = loop or asyncio.get_event_loop()
self._executor = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="sls_uploader")
self._bg_task: Optional[asyncio.Task] = None
self._shutdown = asyncio.Event()
# ---------- public API ----------
async def start(self) -> None:
"""启动后台批量发送任务(幂等)"""
if self._bg_task is None or self._bg_task.done():
self._bg_task = self._loop.create_task(self._worker())
async def put(self, item: LogItem, /) -> None:
"""异步放入队列;满了直接丢弃(不阻塞业务协程)"""
try:
self._queue.put_nowait(item)
except asyncio.QueueFull:
logger.error("SLS log queue full drop a log item")
async def aclose(self, timeout: float | None = 5.0) -> None:
"""
优雅关闭:
- 停止后台任务
- Flush 剩余日志
- 关闭线程池
"""
if self._bg_task and not self._bg_task.done():
self._shutdown.set()
# 等待 task 结束;到时仍未结束就取消
try:
await asyncio.wait_for(self._bg_task, timeout)
except asyncio.TimeoutError:
self._bg_task.cancel()
self._executor.shutdown(wait=True)
# ---------- internal ----------
async def _worker(self) -> None:
"""后台循环:批量收集 → 发送"""
try:
while not self._shutdown.is_set():
await self._flush_once()
try:
# 等待下一轮或被提前唤醒
await asyncio.wait_for(self._shutdown.wait(), timeout=self._send_interval)
except asyncio.TimeoutError:
pass
# 程序即将退出,把残余也发送掉
await self._drain_queue()
except asyncio.CancelledError:
pass # 避免 noisy 日志
async def _flush_once(self) -> None:
"""取最多 batch_size 条日志并上传"""
if self._queue.empty():
return
items: List[LogItem] = []
while items.__len__() < self._batch_size and not self._queue.empty():
try:
items.append(self._queue.get_nowait())
except asyncio.QueueEmpty:
break
if items:
await self._upload(items)
async def _drain_queue(self) -> None:
"""flush 全部剩余日志"""
while not self._queue.empty():
await self._flush_once()
async def _upload(self, items: List[LogItem]) -> Optional[PutLogsResponse]:
"""把同步 put_logs 扔进线程池"""
def _blocking_upload() -> PutLogsResponse:
req = PutLogsRequest(self.project, self.log_store, "", "", items)
return self._client.put_logs(req)
return await self._loop.run_in_executor(self._executor, _blocking_upload)
class SlsSender:
@@ -83,8 +190,7 @@ class SlsSender:
source, log_item_list)
put_response = self.client.put_logs(put_request)
except Exception as e:
self._logger.error(
f"send single log to sls failed: {str(e)}", )
print(f"send single log to sls failed: {str(e)}")
return None
return put_response
@@ -183,6 +289,21 @@ class SlsSender:
time.sleep(self.send_interval)
# ------------------------- Loguru sink ------------------------- #
class AsyncSlsSink:
def __init__(self, sender: AsyncSlsSender):
self._sender = sender
async def __call__(self, message):
record = message.record
item = LogItem(
contents=[("content", message)],
timestamp=int(record["time"].timestamp())
)
await self._sender.put(item)
class SlsSink:
"""A custom loguru sink used to send logs to aliyun sls."""