From fb8a957cd5a4b7de5ce6f5668708de4de1c53712 Mon Sep 17 00:00:00 2001 From: "fanen.lhy" Date: Wed, 24 Apr 2024 16:58:15 +0800 Subject: [PATCH] Init extension aliyun sls log, service component and web base module. --- agentuniverse/agent_serve/service.py | 54 +++++ agentuniverse/agent_serve/service_configer.py | 87 ++++++++ agentuniverse/agent_serve/service_instance.py | 34 +++ agentuniverse/agent_serve/service_manager.py | 13 ++ agentuniverse/agent_serve/web/__init__.py | 7 + agentuniverse/agent_serve/web/dal/__init__.py | 7 + .../agent_serve/web/dal/entity/__init__.py | 7 + .../agent_serve/web/dal/entity/request_do.py | 29 +++ .../agent_serve/web/dal/request_library.py | 151 +++++++++++++ .../agent_serve/web/thread_with_result.py | 40 ++++ agentuniverse/agent_serve/web/web_booster.py | 84 ++++++++ agentuniverse/agent_serve/web/web_util.py | 74 +++++++ agentuniverse_extension/logger/__init__.py | 7 + agentuniverse_extension/logger/sls_sink.py | 201 ++++++++++++++++++ 14 files changed, 795 insertions(+) create mode 100644 agentuniverse/agent_serve/service.py create mode 100644 agentuniverse/agent_serve/service_configer.py create mode 100644 agentuniverse/agent_serve/service_instance.py create mode 100644 agentuniverse/agent_serve/service_manager.py create mode 100644 agentuniverse/agent_serve/web/__init__.py create mode 100644 agentuniverse/agent_serve/web/dal/__init__.py create mode 100644 agentuniverse/agent_serve/web/dal/entity/__init__.py create mode 100644 agentuniverse/agent_serve/web/dal/entity/request_do.py create mode 100644 agentuniverse/agent_serve/web/dal/request_library.py create mode 100644 agentuniverse/agent_serve/web/thread_with_result.py create mode 100644 agentuniverse/agent_serve/web/web_booster.py create mode 100644 agentuniverse/agent_serve/web/web_util.py create mode 100644 agentuniverse_extension/logger/__init__.py create mode 100644 agentuniverse_extension/logger/sls_sink.py diff --git a/agentuniverse/agent_serve/service.py b/agentuniverse/agent_serve/service.py new file mode 100644 index 00000000..49148919 --- /dev/null +++ b/agentuniverse/agent_serve/service.py @@ -0,0 +1,54 @@ +from typing import Optional + +from .service_configer import ServiceConfiger +from ..agent.agent import Agent +from ..base.config.application_configer.application_config_manager import ( + ApplicationConfigManager +) +from ..base.component.component_base import ComponentBase +from ..base.component.component_enum import ComponentEnum + + +class Service(ComponentBase): + """The basic class of the service.""" + + # Basic attributes of the service class. + component_type: ComponentEnum = ComponentEnum.SERVICE + name: Optional[str] = None + description: Optional[str] = None + agent: Optional[Agent] = None + + def __post_init_post_parse__(self): + """Init service code with service name.""" + self.__service_code: Optional[str] = self.get_instance_code() + + def get_instance_code(self) -> str: + """Generate the full service code from service name. """ + app_cfg_manager: ApplicationConfigManager = ApplicationConfigManager() + appname = app_cfg_manager.app_configer.base_info_appname + return f"{appname}.service.{self.name}" + + def initialize_by_component_configer(self, + service_configer: ServiceConfiger) \ + -> 'Service': + """Initialize the Service by the ComponentConfiger object. + + Args: + service_configer(ServiceConfiger): A configer contains service + basic info. + Returns: + Service: A Service instance. + """ + self.name = service_configer.name + self.description = service_configer.description + self.agent = service_configer.agent + return self + + def run(self, **kwargs) -> str: + """The executed function when the service is called.""" + return self.agent.run(**kwargs).to_json_str() + + @property + def service_code(self): + """The unique code of each service, generate from service name.""" + return self.__service_code diff --git a/agentuniverse/agent_serve/service_configer.py b/agentuniverse/agent_serve/service_configer.py new file mode 100644 index 00000000..fcb7c331 --- /dev/null +++ b/agentuniverse/agent_serve/service_configer.py @@ -0,0 +1,87 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/3/25 16:04 +# @Author : fanen.lhy +# @Email : fanen.lhy@antgroup.com +# @FileName: service_configer.py + +from typing import Optional + +from ..agent.agent import Agent +from ..agent.agent_manager import AgentManager +from ..base.config.component_configer.component_configer import ComponentConfiger +from ..base.config.configer import Configer + + +class ServiceConfiger(ComponentConfiger): + """The ServiceConfiger class, used to load and manage the service + configuration.""" + + _ComponentConfiger__metadata_class: Optional[str] = None + _ComponentConfiger__metadata_module: Optional[str] = None + + def __init__(self, configer: Optional[Configer] = None): + """Initialize the ServiceConfiger.""" + super().__init__(configer) + self.__name: Optional[str] = None + self.__description: Optional[str] = None + self.__agent: Optional[Agent] = None + self.__set_default_meta_info() + + @property + def name(self) -> Optional[str]: + """Name field.""" + return self.__name + + @property + def description(self) -> Optional[str]: + """Description field.""" + return self.__description + + @property + def agent(self) -> Optional[Agent]: + """Agent field.""" + return self.__agent + + def __set_default_meta_info(self): + """Set default instantiated class of service.""" + if (not hasattr(self, '_ComponentConfiger__metadata_module') + or self._ComponentConfiger__metadata_module is None): + self._ComponentConfiger__metadata_module = ("agentuniverse." + "agent_serve.service") + if (not hasattr(self, '_ComponentConfiger__metadata_class') + or self._ComponentConfiger__metadata_class is None): + self._ComponentConfiger__metadata_class = 'Service' + + def load(self) -> 'ServiceConfiger': + """Setting property using own configer member property. + + Returns: + ServiceConfiger: A ServiceConfiger instance. + """ + return self.load_by_configer(self.configer) + + def load_by_configer(self, configer: Configer) -> 'ServiceConfiger': + """Initialize self using given configer, get ServiceConfiger property + from it. + Args: + configer(Configer): A Configer instance. + Returns: + ServiceConfiger: A ServiceConfiger instance. + """ + super().load_by_configer(configer) + agent_code = configer.value.get('agent') + self.__set_default_meta_info() + try: + self.__name = configer.value.get('name') + self.__description = configer.value.get('description') + agent_manager: AgentManager = AgentManager() + self.__agent = agent_manager.get_instance_obj(agent_code) + if not self.__agent: + raise ValueError + except ValueError: + raise ValueError(f"No such Agent: {agent_code}") + except Exception as e: + raise Exception(f"Failed to parse the Agent configuration: {e}") + return self diff --git a/agentuniverse/agent_serve/service_instance.py b/agentuniverse/agent_serve/service_instance.py new file mode 100644 index 00000000..c742fd72 --- /dev/null +++ b/agentuniverse/agent_serve/service_instance.py @@ -0,0 +1,34 @@ +from .service import Service +from .service_manager import ServiceManager + + +class ServiceNotFoundError(Exception): + """An exception when service code is not in service manager.""" + def __init__(self, service_code: str): + super().__init__(f"Service {service_code} not found.") + self.service_code = service_code + + +class ServiceInstance(object): + """A service wrapper class, which should be directly called in project + instead of Service class.""" + + def __init__(self, service_code: str): + """Initialize a service instance. Raise an ServiceNotFoundError when + service code can't be found by servie manager. + + Args: + service_code (`str`): + Unique code of the service. + """ + self.__service_code = service_code + service_manager: ServiceManager = ServiceManager() + self.__service: Service = service_manager.get_instance_obj( + service_code + ) + if self.__service is None: + raise ServiceNotFoundError(service_code) + + def run(self, **kwargs) -> str: + """Call the service run.""" + return self.__service.run(**kwargs) diff --git a/agentuniverse/agent_serve/service_manager.py b/agentuniverse/agent_serve/service_manager.py new file mode 100644 index 00000000..f65fb7dc --- /dev/null +++ b/agentuniverse/agent_serve/service_manager.py @@ -0,0 +1,13 @@ +from ..base.annotation.singleton import singleton +from ..base.component.component_enum import ComponentEnum +from ..base.component.component_manager_base import ComponentManagerBase +from .service import Service + + +@singleton +class ServiceManager(ComponentManagerBase[Service]): + """A singleton manager class of the service.""" + + def __init__(self): + super().__init__(ComponentEnum.SERVICE) + diff --git a/agentuniverse/agent_serve/web/__init__.py b/agentuniverse/agent_serve/web/__init__.py new file mode 100644 index 00000000..e8eb8907 --- /dev/null +++ b/agentuniverse/agent_serve/web/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/3/25 11:14 +# @Author : fanen.lhy +# @Email : fanen.lhy@antgroup.com +# @FileName: __init__.py.py diff --git a/agentuniverse/agent_serve/web/dal/__init__.py b/agentuniverse/agent_serve/web/dal/__init__.py new file mode 100644 index 00000000..b5247c66 --- /dev/null +++ b/agentuniverse/agent_serve/web/dal/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/3/26 15:15 +# @Author : fanen.lhy +# @Email : fanen.lhy@antgroup.com +# @FileName: __init__.py.py diff --git a/agentuniverse/agent_serve/web/dal/entity/__init__.py b/agentuniverse/agent_serve/web/dal/entity/__init__.py new file mode 100644 index 00000000..7cea3606 --- /dev/null +++ b/agentuniverse/agent_serve/web/dal/entity/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/3/26 15:16 +# @Author : fanen.lhy +# @Email : fanen.lhy@antgroup.com +# @FileName: __init__.py.py diff --git a/agentuniverse/agent_serve/web/dal/entity/request_do.py b/agentuniverse/agent_serve/web/dal/entity/request_do.py new file mode 100644 index 00000000..327171e5 --- /dev/null +++ b/agentuniverse/agent_serve/web/dal/entity/request_do.py @@ -0,0 +1,29 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/3/26 15:17 +# @Author : fanen.lhy +# @Email : fanen.lhy@antgroup.com +# @FileName: request_do.py + +import datetime +from typing import Optional + +from pydantic import BaseModel, Field + + +class RequestDO(BaseModel): + """Data Object class of an agent service request.""" + + id: Optional[int] = Field(description="ID", default=None) + request_id: str = Field(description="Unique request id.") + session_id: str = Field(description="Session id of the request.") + query: str = Field(description="The query contents.") + state: str = Field(description="State of the request.") + result: dict = Field(description="Exec result.") + steps: list = Field(description="Exec steps.") + additional_args: dict = Field(description="Additional info.") + gmt_create: Optional[datetime.datetime] = Field( + description="Create time", default_factory=datetime.datetime.now) + gmt_modified: Optional[datetime.datetime] = Field( + description="Modified time", default_factory=datetime.datetime.now) diff --git a/agentuniverse/agent_serve/web/dal/request_library.py b/agentuniverse/agent_serve/web/dal/request_library.py new file mode 100644 index 00000000..6b58486e --- /dev/null +++ b/agentuniverse/agent_serve/web/dal/request_library.py @@ -0,0 +1,151 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/3/26 15:18 +# @Author : fanen.lhy +# @Email : fanen.lhy@antgroup.com +# @FileName: request_library.py + +import datetime +import json + +from sqlalchemy import JSON, Integer, String, DateTime, Column, create_engine +from sqlalchemy import select +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker + +from .entity.request_do import RequestDO +from agentuniverse.base.util.system_util import get_project_root_path +from agentuniverse.base.config.configer import Configer +from agentuniverse.base.annotation.singleton import singleton + +REQUEST_TABLE_NAME = 'request_task' +Base = declarative_base() + + +class RequestORM(Base): + """SQLAlchemy ORM Model for RequestDO.""" + __tablename__ = REQUEST_TABLE_NAME + id = Column(Integer, primary_key=True, autoincrement=True) + request_id = Column(String, nullable=False) + query = Column(String) + session_id = Column(String) + state = Column(String) + result = Column(JSON) + steps = Column(JSON) + additional_args = Column(JSON) + gmt_create = Column(DateTime, default=datetime.datetime.now) + gmt_modified = Column(DateTime, default=datetime.datetime.now, + onupdate=datetime.datetime.now) + + +@singleton +class RequestLibrary: + def __init__(self, configer: Configer = None): + """Init the database connection. Use uri in config file or use sqlite + as default database.""" + mysql_uri = None + if Configer: + mysql_uri = configer.get('DB', {}).get('mysql_uri') + if mysql_uri and mysql_uri.strip(): + db_uri = mysql_uri + else: + db_path = get_project_root_path() / 'DB' / 'agent_framework.db' + db_path.parent.mkdir(parents=True, exist_ok=True) + db_uri = f'sqlite:////{db_path}' + + # Create database engine + self.engine = create_engine( + db_uri, + _json_serializer=lambda x: json.dumps(x, ensure_ascii=False) + ) + self.Session = sessionmaker(bind=self.engine) + + with self.engine.connect() as conn: + if not conn.dialect.has_table(conn, REQUEST_TABLE_NAME): + Base.metadata.create_all(self.engine) + + def query_request_by_request_id(self, request_id: str) -> RequestDO | None: + """Get a RequestDO with given request_id. + + Args: + request_id(`str`): The unique request id of request task. + + Return: + The target RequestDO or none when no such data. + """ + session = self.Session() + try: + result = session.execute( + select(RequestORM).where(RequestORM.request_id == request_id) + ).scalars().first() + if not result: + return None + return self.__request_orm_to_do(result) + finally: + session.close() + + def add_request(self, request_do: RequestDO) -> int: + """Add the given RequestDO to database. + + Args: + request_do(`RequestDO`): A new RequestDO to be added. + + Return: + A int stands unique data id in table. + """ + session = self.Session() + try: + request_orm = RequestORM(**request_do.model_dump()) + session.add(request_orm) + session.commit() + return request_orm.id + finally: + session.close() + + def update_request(self, request_do: RequestDO): + """Update the request data with same request id as the given + RequestDO.""" + session = self.Session() + try: + db_request_do = session.query(RequestORM).filter( + RequestORM.request_id == request_do.request_id).first() + if db_request_do: + update_data = request_do.model_dump(exclude_unset=True) + for key, value in update_data.items(): + setattr(db_request_do, key, value) + session.commit() + session.refresh(db_request_do) + finally: + session.close() + + def update_gmt_modified(self, request_id: str): + """Update the request task latest active time.""" + session = self.Session() + try: + db_request_do = session.query(RequestORM).filter( + RequestORM.request_id == request_id).first() + if db_request_do: + setattr(db_request_do, "gmt_modified", datetime.datetime.now()) + session.commit() + session.refresh(db_request_do) + finally: + session.close() + + def __request_orm_to_do(self, request_orm: RequestORM) -> RequestDO: + """Transfer a RequestORM to RequestDO.""" + request_obj = RequestDO( + request_id='', + session_id="", + query='', + state='', + result=dict(), + steps=[], + additional_args=dict(), + gmt_create=datetime.datetime.now(), + gmt_modified=datetime.datetime.now(), + ) + for column in request_orm.__table__.columns: + setattr(request_obj, column.name, + getattr(request_orm, column.name)) + return request_obj diff --git a/agentuniverse/agent_serve/web/thread_with_result.py b/agentuniverse/agent_serve/web/thread_with_result.py new file mode 100644 index 00000000..96414c9c --- /dev/null +++ b/agentuniverse/agent_serve/web/thread_with_result.py @@ -0,0 +1,40 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/3/26 14:31 +# @Author : fanen.lhy +# @Email : fanen.lhy@antgroup.com +# @FileName: thread_with_result.py + +from threading import Thread + + +class ThreadWithReturnValue(Thread): + """A thread can save the target func exec result.""" + def __init__(self, group=None, target=None, name=None, + args=(), kwargs=None): + super().__init__(group, target, name, args, kwargs) + + if kwargs is None: + kwargs = {} + self.kwargs = kwargs + self.args = args + self.target = target + self._return = None + self.error = None + + def run(self): + """Run the target func and save result in _return.""" + if self.target is not None: + try: + self._return = self.target(*self.args, **self.kwargs) + except Exception as e: + self.error = e + + def result(self): + """Wait for target func finished, then return the result or raise an + error.""" + self.join() + if self.error is not None: + raise self.error + return self._return diff --git a/agentuniverse/agent_serve/web/web_booster.py b/agentuniverse/agent_serve/web/web_booster.py new file mode 100644 index 00000000..ef3d628e --- /dev/null +++ b/agentuniverse/agent_serve/web/web_booster.py @@ -0,0 +1,84 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/3/28 10:18 +# @Author : fanen.lhy +# @Email : fanen.lhy@antgroup.com +# @FileName: web_booster.py + +import tomli +from gunicorn.app.base import BaseApplication + +from .web_server import app +from agentuniverse.base.annotation.singleton import singleton + + +DEFAULT_GUNICORN_CONFIG = { + 'bind': '127.0.0.1:8000', + 'workers': 5, + 'backlog': 2048, + 'worker_class': 'gthread', + 'threads': 4, + 'timeout': 60, + 'keepalive': 10 +} + + +@singleton +class GunicornApplication(BaseApplication): + """Use gunicorn to wrap the flask web server.""" + def __init__(self, config_path: str = None): + self.options = {} + if config_path: + self.__load_config_from_file(config_path) + else: + self.default_config = None + self.application = app + super().__init__() + + def load_config(self): + """Check the config file first, use default config while config file + not exist, then overwrite parts which in options.""" + if not self.default_config: + config = DEFAULT_GUNICORN_CONFIG + else: + config = self.default_config + for key, value in config.items(): + if key in self.cfg.settings and value is not None: + self.cfg.set(key.lower(), value) + + # The priority of the passed arguments supersedes that of config file. + for key, value in self.options.items(): + if key in self.cfg.settings and value is not None: + self.cfg.set(key.lower(), value) + + def update_config(self, options: dict): + self.options = options + self.load_config() + + def load(self): + return self.application + + def __load_config_from_file(self, config_path: str): + """Load gunicorn config file.""" + try: + with open(config_path, 'rb') as f: + config = tomli.load(f)["GUNICORN_CONFIG"] + except (FileNotFoundError, TypeError): + print("can't find gunicorn config file, use default config") + return + except (tomli.TOMLDecodeError, KeyError): + print("gunicorn config file isn't a valid toml, " + "use default config.") + return + + self.default_config = { + key: value for key, value in config.items() + } + + +def start_web_server(**kwargs): + """Start func of flask server. Accept input arguments to overwrite default + gunicorn config.""" + GunicornApplication().update_config(kwargs) + GunicornApplication().run() \ No newline at end of file diff --git a/agentuniverse/agent_serve/web/web_util.py b/agentuniverse/agent_serve/web/web_util.py new file mode 100644 index 00000000..d352d98b --- /dev/null +++ b/agentuniverse/agent_serve/web/web_util.py @@ -0,0 +1,74 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/3/26 10:34 +# @Author : fanen.lhy +# @Email : fanen.lhy@antgroup.com +# @FileName: web_util.py + +import inspect +import queue +import json + +from flask import request, make_response, jsonify + +from ..service_instance import ServiceInstance + + +def request_param(func): + """An annotation used to parse the flask request params.""" + def wrapper(*args, **kwargs): + if request.method == "GET": + req_data = request.args.to_dict() + # Get the post params from body according to different content type. + else: + if "application/json" in request.headers.get("Content-Type"): + raw_data = request.data.decode('utf-8') + req_data = json.loads(raw_data) + else: + req_data = request.form.to_dict() + # Get the func arguments name and type. + sig = inspect.signature(func) + for name, param in sig.parameters.items(): + if name == "kwargs": + for key in req_data: + if key not in kwargs: + kwargs[key] = req_data[key] + continue + if name == "session_id": + kwargs[name] = request.headers.get("X-Session-Id") + elif param.annotation in (str, int, dict, list): + kwargs[name] = req_data.get(name) + else: + kwargs[name] = param.annotation(**req_data) + return func(*args, **kwargs) + + wrapper.__name__ = func.__name__ + return wrapper + + +def service_run_queue(service_id, **kwargs): + """The func used in a separate thread to run an agent service. The result + will be saved in a queue if one is provided.""" + stream: queue.Queue = kwargs.get('output_stream') + try: + res = ServiceInstance(service_id).run(**kwargs) + return res + finally: + if stream: + stream.put_nowait('{"type": "EOF"}') + + +def make_standard_response(success: bool, + result=None, + message: str = None, + request_id: str = None, + status_code=200): + """Construct a standard flask response.""" + response_data = { + "success": success, + "result": result, + "message": message, + "request_id": request_id + } + return make_response(jsonify(response_data), status_code) diff --git a/agentuniverse_extension/logger/__init__.py b/agentuniverse_extension/logger/__init__.py new file mode 100644 index 00000000..22862f86 --- /dev/null +++ b/agentuniverse_extension/logger/__init__.py @@ -0,0 +1,7 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/3/15 14:28 +# @Author : fanen.lhy +# @Email : fanen.lhy@antgroup.com +# @FileName: __init__.py.py diff --git a/agentuniverse_extension/logger/sls_sink.py b/agentuniverse_extension/logger/sls_sink.py new file mode 100644 index 00000000..815e6f81 --- /dev/null +++ b/agentuniverse_extension/logger/sls_sink.py @@ -0,0 +1,201 @@ +# !/usr/bin/env python3 +# -*- coding:utf-8 -*- + +# @Time : 2024/3/12 14:04 +# @Author : fanen.lhy +# @Email : fanen.lhy@antgroup.com +# @FileName: sls_sink.py + +import queue +import threading +import time +from typing import List, Optional + +from aliyun.log.logclient import LogClient +from aliyun.log.logitem import LogItem +from aliyun.log.putlogsrequest import PutLogsRequest +from aliyun.log.putlogsresponse import PutLogsResponse +import loguru + + +class SlsSender: + """A class to send log to aliyun simple log server.""" + + def __init__( + self, + project: str, + log_store: str, + endpoint: str, + access_key_id: str, + access_key_secret: str, + queue_max_size: int, + send_interval: float): + """Initialize a sls sender. + + Args: + project (`str`): + Project name of aliyun sls. + log_store (`str`): + Log store of aliyun sls. + endpoint (`str`): + Endpoint of aliyun sls. + access_key_id (`str`): + Project name of aliyun sls. + access_key_secret (`str`): + Project name of aliyun sls. + queue_max_size (`int`): + Log queue max size, sls sender use a queue to save the logs + to be sent, a separate thread will upload logs to aliyun sls + periodically. + send_interval (`float`): + Interval of the separate thread sending logs to aliyun sls. + """ + self.project = project + self.log_store = log_store + self.client = LogClient(endpoint, access_key_id, access_key_secret) + self.log_queue = queue.Queue(queue_max_size) + self.send_interval = send_interval + + self.send_thread_stop_event = threading.Event() + self.send_thread = None + self._logger = loguru.logger + + def _send_put_logs_request(self, + log_item_list: List[LogItem], + topic: str = "", + source: str = "") -> Optional[PutLogsResponse]: + """Send a batch of logs to aliyun sls. + + Args: + log_item_list (`List[LogItem]`): + A list of log items to be sent to aliyun sls. + topic (`str`, defaults to `""`): + An attribute used to identify a group of logs in aliyun sls. + source (`str`, defaults to `""`): + An identifier that allows to discern the source of the log. + + Returns: + If logs sent successfully, returns a PutLogsResponse, else returns + none and log the error in local log file. + """ + try: + put_request = PutLogsRequest(self.project, self.log_store, topic, + source, log_item_list) + put_response = self.client.put_logs(put_request) + except Exception as e: + self._logger.error( + f"send single log to sls failed: {str(e)}", ) + return None + return put_response + + def send_single_log(self, + message: str, + topic: str = "", + source: str = "") -> Optional[PutLogsResponse]: + """Send a single log to aliyun sls. + + Args: + message (`str`): + The message to be sent to aliyun sls. + topic (`str`, defaults to `""`): + An attribute used to identify a group of logs in aliyun sls. + source (`str`, defaults to `""`): + An identifier that allows to discern the source of the log. + + Returns: + If logs sent successfully, returns a PutLogsResponse, else returns + none and log the error in local log file. + """ + log_item_list = list() + log_item = LogItem() + log_item.set_contents(message) + log_item_list.append(log_item) + return self._send_put_logs_request(log_item_list, topic, source) + + def put_log_queue(self, log_item: LogItem): + """Put a single log item into the waiting queue. + + Args: + log_item (`LogItem`): + The log item to be put into the queue. + """ + try: + self.log_queue.put(log_item, block=False) + except queue.Full: + self._logger.error("sls log queue is full, discard new log") + + def batch_send(self, + topic: str = "", + source: str = "") -> Optional[PutLogsResponse]: + """Send all log items in waiting queue to aliyun sls. + + Args: + topic (`str`, defaults to `""`): + An attribute used to identify a group of logs in aliyun sls. + source (`str`, defaults to `""`): + An identifier that allows to discern the source of the log. + + Returns: + If logs sent successfully, returns a PutLogsResponse, else returns + none and log the error in local log file. + """ + + # Get all log items in waiting queue. + size = self.log_queue.qsize() + log_item_list = [] + if self.log_queue is not None and size > 0: + for i in range(size): + try: + log_item = self.log_queue.get(block=False) + except queue.Empty: + self._logger.error( + "sls log queue shorter than expected, " + "all logs have been sent") + break + log_item_list.append(log_item) + + # Send all log items to aliyun sls. + length = len(log_item_list) + if length > 0: + return self._send_put_logs_request(log_item_list, topic, source) + return None + + def start_batch_send_thread(self): + """Start the log sending thread.""" + if self.send_thread is None or not self.send_thread.is_alive(): + self.send_thread_stop_event.clear() + self.send_thread = threading.Thread( + target=self._schedule_send_log, + name="loop_send_log_thread", daemon=True) + self.send_thread.start() + + def stop_batch_send_thread(self): + """Stop the log sending thread.""" + if self.send_thread is not None: + self.send_thread_stop_event.set() + self.send_thread.join() + self.send_thread = None + + def _schedule_send_log(self): + """Create an infinite loop uploading logs in queue to aliyun sls.""" + while not self.send_thread_stop_event.is_set(): + self.batch_send() + time.sleep(self.send_interval) + + +class SlsSink: + """A custom loguru sink used to send logs to aliyun sls.""" + + def __init__(self, sls_sender: SlsSender): + """Initialize the sls sink.""" + self.sls_sender = sls_sender + + def __call__(self, message): + """Construct the message to a sls log item and put it to the queue + waiting for sls sender to send to the aliyun sls.""" + log_item = LogItem() + log_item.set_time(int(message.record["time"].timestamp())) + log_content = list() + log_content.append(("content", str(message))) + log_item.set_contents(log_content) + self.sls_sender.put_log_queue(log_item)