Note
由于受本人水平所限,本文章所述可能不够准确,且内容表述比较主观,还请读者包涵qwq
本文正文部分会随时更新/重写正文中的任意部分,关心每次更新变动的读者看下diff就好啦
若无特殊说明,本文中的Nonebot2代码均来源于v2.3.1
本文作者在23年11月的时候第一次接触Nonebot2,当时便被Nonebot2以及它庞大的生态系统所吸引,当时便想读一遍Nonebot2的源码,但受于时间和条件所限,当时只看了一点ob11适配器的内容就give up了
今年六月份,由于各种机缘巧合,以及本人有了些许空闲时间,便捡起来了去年末的这个梦想(
众所周知,bot.py(或类似的结构)是Nonebot2启动的脚手架,那我们就从bot.py看起
import nonebot
from nonebot.adapters.onebot.v11 import Adapter as Onebot11Adapter
nonebot.init()
driver = nonebot.get_driver()
driver.register_adapter(Onebot11Adapter)
if __name__ == "__main__":
nonebot.run()
不难看出Driver,Adapter,Bot这三部分从前到后耦合在一起,组成了Nonebot
driver整体的架构如下:
│ ├── driver
│ │ ├── __init__.py //导入文件
│ │ ├── _lifespan.py // 看上去只负责钩子函数的生命周期(即在特定的时间点,比如startup,ready,shutdown执行哪些函数,被abstract中的class Driver(abc.ABC)所覆盖
│ │ ├── abstract.py //各种抽象类的接口,具体见下
│ │ ├── combine.py // 负责driver的混入
│ │ └── model.py //TODO:待填坑,似乎是为了抹平不同driver impl所创作的模型
具体可以拆分为如下部分:
class Driver(abc.ABC) // {lifespan} main base [fastapi quart None] -> fastapi/quart
> class HTTPClientSession(abc.ABC) // {HttpDriver会话} -> aiohttp/httpx -->>[创建长session]
class Mixin(abc.ABC) // mixin base
|- class ForwardMixin(Mixin) // {ForwardDriver}
|- class HTTPClientMixin(ForwardMixin) -> aiohttp/httpx -->>[借助于class HTTPClientSession(abc.ABC)实现长session下请求]
|- class WebSocketClientMixin(ForwardMixin) -> aiohttp/websockets
|- class ReverseMixin(Mixin) // {ReverseDriver}
|- class ASGIMixin(ReverseMixin) -> fastapi/quart
Driver具有启动整个Nonebot的接口(run),具有注册Nonebot整体生命周期钩子函数(on_startup,on_shutdown)的接口,具有注册Bot生命周期钩子函数(on_bot_connect,on_bot_disconnect)。同时Driver中存储了adapter。
Driver具有以下特色:
-
默认情况下使用FastAPI作为driver
-
不同的driver通过mixin机制可以混搭(组合)
-
mixin机制实现在nonebot/internal/driver/combine.py,很有意思的实现。
具体是:
-
加载原始,完整继承 class Driver(abc.ABC)(也就是实现了lifespan)的DriverClass
ref: nonebot2/nonebot/init.py at v2.3.1#L254 · nonebot/nonebot2 (github.com)
-
仅mixin元素本身自己混入一遍(例如aiohttp,websockets),使用签名(NoneDriver和一个mixin进行混入) ref: nonebot2/nonebot/init.py at v2.3.1#L261 · nonebot/nonebot2 (github.com)(加载混入对象)
ref: nonebot2/nonebot/drivers/aiohttp.py at v2.3.1#276 · nonebot/nonebot2 (github.com)(通过combine_driver混入NoneDriver从而导出Driver)
def combine_driver( driver: type[D], *mixins: type[Mixin] ) -> Union[type[D], type["CombinedDriver"]]:
-
使用combine_driver将1中完整继承 class Driver(abc.ABC)的class和mixin元素们进行最终混入
ref: nonebot2/nonebot/init.py at v2.3.1#265 · nonebot/nonebot2 (github.com)
最终,混成的对象遵循MRO原则,意味着,比如我使用
~fastapi+~websocket+~aiohttp
时,class Driver(abc.ABC)中的class WebSocketClientMixin(ForwardMixin)
来源于aiohttp,而不是websocket。>>> (*mixins, driver) (<class 'nonebot.drivers.websockets.Mixin'>, <class 'nonebot.drivers.aiohttp.Mixin'>, <class 'nonebot.drivers.fastapi.Driver'>) >>> DriverClass.__mro__ (<class 'abc.CombinedDriver'>, <class 'nonebot.drivers.websockets.Mixin'>, <class 'nonebot.drivers.aiohttp.Mixin'>, <class 'nonebot.internal.driver.abstract.HTTPClientMixin'>, <class 'nonebot.internal.driver.abstract.WebSocketClientMixin'>, <class 'nonebot.internal.driver.abstract.ForwardMixin'>, <class 'nonebot.drivers.fastapi.Driver'>, <class 'nonebot.internal.driver.abstract.Driver'>, <class 'nonebot.internal.driver.abstract.ASGIMixin'>, <class 'nonebot.internal.driver.abstract.ReverseMixin'>, <class 'nonebot.internal.driver.abstract.Mixin'>, <class 'abc.ABC'>, <class 'object'>)
-
-
-
FastAPI,Quart,None具有完整的生命周期管理,即继承了class Driver(abc.ABC) (aka BaseDriver),其还有自己的mixin。其余的Driver只能作为Mixin
-
FastAPI,Quart使用其内置的生命周期管理,None Driver内部纯手动实现生命周期管理
-
在Feature: 支持 HTTP 客户端会话 by yanyongyu · Pull Request #2627 · nonebot/nonebot2 (github.com)中,将HTTPClientMixin拆分为 class HTTPClientMixin(ForwardMixin) (实际只调用session层)和class HTTPClientSession(abc.ABC)(实际的session层)
以达到复用session的作用(由于httpx,不复用session的话效率很低)
ref: Defect: 使用
httpx
情况下ForwardDriver.request
在并发时存在性能问题 · Issue #2617 · nonebot/nonebot2 (github.com)总体来看,这种抽象方式确实很抽象,实现了driver的插拔自由
而且在最近一次的结构调整中,将http拆分出client和session也是很不错的,将httpx的性能提升到了aiohttp一样的水平,而且这样的更改也不为过。看来httpx是一个
很适合无脑使用的网络请求库(?虽然没有抽象出kirtor这种基于grpc的协议,也正常。
四年前的我们怎能想到还需要抽象出grpc
在Driver的最原始的基类,也有着管理Bot和Adapter的职责,毕竟Nonebot的生命周期全权由Driver管理。
class Driver(abc.ABC):
"""驱动器基类。
驱动器控制框架的启动和停止,适配器的注册,以及机器人生命周期管理。
参数:
env: 包含环境信息的 Env 对象
config: 包含配置信息的 Config 对象
"""
_adapters: ClassVar[dict[str, "Adapter"]] = {}
"""已注册的适配器列表"""
_bot_connection_hook: ClassVar[set[Dependent[Any]]] = set()
"""Bot 连接建立时执行的函数"""
_bot_disconnection_hook: ClassVar[set[Dependent[Any]]] = set()
"""Bot 连接断开时执行的函数"""
def __init__(self, env: Env, config: Config):
self.env: str = env.environment
"""环境名称"""
self.config: Config = config
"""全局配置对象"""
self._bots: dict[str, "Bot"] = {}
self._bot_tasks: set[asyncio.Task] = set()
self._lifespan = Lifespan()
导出函数:
@property
def bots(self) -> dict[str, "Bot"]:
# 注册适配器,在bot.py使用
def register_adapter(self, adapter: type["Adapter"], **kwargs) -> None:
@property
@abc.abstractmethod
def type(self) -> str:
"""驱动类型名称"""
raise NotImplementedError
@property
@abc.abstractmethod
def logger(self):
# driver,启动!
@abc.abstractmethod
def run(self, *args, **kwargs):
# 钩子函数
def on_startup(self, func: LIFESPAN_FUNC) -> LIFESPAN_FUNC:
def on_shutdown(self, func: LIFESPAN_FUNC) -> LIFESPAN_FUNC:
@classmethod
def on_bot_connect(cls, func: T_BotConnectionHook) -> T_BotConnectionHook:
@classmethod
def on_bot_disconnect(cls, func: T_BotDisconnectionHook) -> T_BotDisconnectionHook:
# 在Adapter中 def bot_connect(self, bot: Bot) -> None: 被使用
def _bot_connect(self, bot: "Bot") -> None:
# 在Adapter中 def bot_disconnect(self, bot: Bot) -> None: 被使用
def _bot_disconnect(self, bot: "Bot") -> None:
async def _cleanup(self) -> None:
- adapter使用register_adapter注册到_adapters。
_bot_connect
和_bot_disconnect
在adapter中被调用。
在Driver中有四个钩子函数,按照类型注解可以分为两类
-
on_bot_connect, on_bot_disconnect
他们的类型注解为 T_BotConnectionHook 和 T_BotDisconnectionHook
T_BotConnectionHook: TypeAlias = _DependentCallable[t.Any] T_BotDisconnectionHook: TypeAlias = _DependentCallable[t.Any] _DependentCallable: TypeAlias = t.Union[ t.Callable[..., T], t.Callable[..., t.Awaitable[T]] ]
从类型注解上看,函数接受一个任意的同步/异步函数,返回T值或t.Awaitable[T]。但实际上,_DependentCallable[t.Any]在Nonebot中作为依赖注入容器使用,根据官网文档,可以注入Bot对象。
==这两个函数的具体实现会在后面分析依赖注入的时候说到,
敬请期待== -
on_startup,on_shutdown
他们的类型注解为LIFESPAN_FUNC
SYNC_LIFESPAN_FUNC: TypeAlias = Callable[[], Any] ASYNC_LIFESPAN_FUNC: TypeAlias = Callable[[], Awaitable[Any]] LIFESPAN_FUNC: TypeAlias = Union[SYNC_LIFESPAN_FUNC, ASYNC_LIFESPAN_FUNC]
从类型注解来看,函数不接受任何内容,返回Any或Awaitable[Any]。这里没有任何依赖注入,只是简单的传入函数
ref: 钩子函数 | NoneBot
driver.run()之后,Nonebot便开始启动。再以"最简单"的None驱动器开始
继承关系: NoneDriver 继承 abc.abstractMethod (显而易见)
[abc基类中] driver.run() -> (执行基类的)self.on_shutdown(self._cleanup) 中注册清理函数 ->
[NoneDriver类中] 事件循环执行 self._serve()
NoneDriver类中具体的serve函数:
async def _serve(self):
self._install_signal_handlers()
await self._startup() -> 执行所有await self._lifespan.startup()的函数
# should_exit正常在收到退出信号后才为True,默认情况下是False
# 如果是强制退出,则不会取消所有的Task
if self.should_exit.is_set(): -> 决定是否退出,以及是否强制退出
return
# 从这块开始,监听退出
await self._main_loop() -> await self.should_exit.wait() 等待退出信号
await self._shutdown() -> 执行所有await self._lifespan.shutdown()的函数,并取消所有事件
具体的shutdown部分:
async def _shutdown(self):
logger.info("Shutting down")
logger.info("Waiting for application shutdown.")
try:
await self._lifespan.shutdown()
except Exception as e:
logger.opt(colors=True, exception=e).error(
"<r><bg #f8bbd0>Error when running shutdown function. "
"Ignored!</bg #f8bbd0></r>"
)
for task in asyncio.all_tasks():
if task is not asyncio.current_task() and not task.done():
task.cancel()
await asyncio.sleep(0.1)
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
if tasks and not self.force_exit:
logger.info("Waiting for tasks to finish. (CTRL+C to force quit)")
while tasks and not self.force_exit:
await asyncio.sleep(0.1)
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
logger.info("Application shutdown complete.")
loop = asyncio.get_event_loop()
loop.stop()
传统的FastAPIDriver反而没有这样复杂。生命周期部分只有注入的_lifespan_manager
class Driver(BaseDriver, ASGIMixin):
"""FastAPI 驱动框架。"""
def __init__(self, env: Env, config: NoneBotConfig):
super().__init__(env, config)
self.fastapi_config: Config = type_validate_python(Config, model_dump(config))
self._server_app = FastAPI(
lifespan=self._lifespan_manager, # 注意到这里注入了框架的异步上下文管理器_lifespan_manager
openapi_url=self.fastapi_config.fastapi_openapi_url,
docs_url=self.fastapi_config.fastapi_docs_url,
redoc_url=self.fastapi_config.fastapi_redoc_url,
**self.fastapi_config.fastapi_extra,
)
@contextlib.asynccontextmanager
# 框架的异步上下文管理器
async def _lifespan_manager(self, app: FastAPI):
await self._lifespan.startup()
try:
yield
finally:
await self._lifespan.shutdown()
有趣的是,在_Lifespan类(nonebot/internal/driver/_lifespan.py
)中(准确的说是 nonebot.utils)也包含了同步函数包装为异步的工具方法。
而_Lifespan类在实际的Nonebot使用中通过@driver.xxx的装饰器的形式,能让用户以钩子函数的形式注入nonebot2的生命周期
至此,nonebot/drivers + nonebot/internal/driver 文件夹下除了model.py就基本看完了。
nonebot/internal/driver/model.py下似乎是一些小型模型的抽象,例如:
class Request: class Response: class WebSocket(abc.ABC): class Cookies(MutableMapping):
@dataclass
class HTTPServerSetup:
@dataclass
class WebSocketServerSetup:
==有空回头看。==
由于Adapter是与Driver耦合的,我们现在来看看Adapter
基类Adapter具有和Driver(lifespan_on_ready钩子,bot_connect, bot_disconnect 钩子, 网络交互)交互的接口,Adapter传入Driver进行初始化,同时存储了Bot实例
值得一提的是,websocket函数使用了异步上下文管理器,比起传统的 __aenter__
+ __aexit__
构造具有简洁的优势。
@asynccontextmanager
async def websocket(self, setup: Request) -> AsyncGenerator[WebSocket, None]:
"""建立一个 WebSocket 客户端连接请求"""
if not isinstance(self.driver, WebSocketClientMixin):
raise TypeError("Current driver does not support websocket client")
async with self.driver.websocket(setup) as ws:
yield ws
同时,子适配器需要实现函数:
async def _call_api(self, bot: Bot, api: str, **data: Any) -> Any:
这个函数是Bot类中Bot.call_api的具体调用函数,同时也是文档推荐的Bot.send的实际调用方法
我们接下来看下ob11中Adapter的具体实现:
首先,需要在driver初始化后,使用driver.register_adapter(xxxAdapter)
来注册adapter。
注册函数中包含:
self._adapters[name] = adapter(self, **kwargs)
看下ob11适配器的__init__
@override
def __init__(self, driver: Driver, **kwargs: Any):
super().__init__(driver, **kwargs)
self.onebot_config: Config = get_plugin_config(Config)
"""OneBot V11 配置"""
self.connections: Dict[str, WebSocket] = {}
self.tasks: List["asyncio.Task"] = []
self._setup()
def _setup(self) -> None:
if isinstance(self.driver, ASGIMixin):
... # 注册路由到http及ws,即正反向http+反向ws
if self.onebot_config.onebot_ws_urls:
if not isinstance(self.driver, WebSocketClientMixin):
# driver未实现WebSocketClientMixin,无法完成正向ws,忽略
log(
"WARNING",
(
f"Current driver {self.config.driver} does not support "
"websocket client connections! Ignored"
),
)
else:
# 可以实现正向ws
self.on_ready(self._start_forward) # 向on_ready注册正向ws
self.driver.on_shutdown(self._stop_forward) # on_shutdown时停止正向ws
self._setup()
注册了相关路由,正向server,以及处理他们的函数
async def _handle_http(self, request: Request) -> Response: # -> 处理http
async def _handle_ws(self, websocket: WebSocket) -> None: # -> 处理反向ws
async def _start_forward(self) -> None: # 开启正向ws server
async def _stop_forward(self) -> None: # 关闭正向ws server
# _start_forward开启一个新的task去存储具体的_forward_ws,并把task存在self.tasks,在_stop_forward时负责cancel这些task
async def _forward_ws(self, url: URL) -> None # 处理正向ws
从收消息到解析消息到处理消息,摘抄反向ws的处理部分:
while True:
data = await websocket.receive() # 裸消息段,str
json_data = json.loads(data) # 解析为json的消息段,dict
if event := self.json_to_event(json_data): # *把json paste成nb的event
asyncio.create_task(bot.handle_event(event)) # 开启新的task,去处理这个event
ℹ️ 注解提示:
由于本人比较懒,把整个调用过程展示这个本应该使用结构图展示的部分直接以赛博打端点的方式展示了
>[B]
为断点标记,下面的注释为这个断点处在ipython终端执行的语句展示
nonebot/adapters/onebot/collator.py
负责处理json信息到事件模型的转换和匹配
注意到nonebot/adapters/onebot/v11/adapter.py
的最前面有:
from nonebot.adapters.onebot.collator import Collator
RECONNECT_INTERVAL = 3.0
DEFAULT_MODELS: List[Type[Event]] = []
for model_name in dir(event):
model = getattr(event, model_name)
if not inspect.isclass(model) or not issubclass(model, Event):
continue
DEFAULT_MODELS.append(model)
class Adapter(BaseAdapter):
# 0.0 初始化Collator
event_models = Collator(
# self.name
"OneBot V11",
# self.models
DEFAULT_MODELS, # ob11协议所有符合pydamic的event类
# self.keys
(
"post_type", # 第一层前缀: 所有事件中一定有的post_type
("message_type", "notice_type", "request_type", "meta_event_type"), # 第二层前缀: 所有事件中一定有的post_type的子类型
"sub_type", # 第三层前缀: 部分事件中有的sub_type (事件子类型)
),
)
event的层次分类ref: onebot-11/event at master · botuniverse/onebot-11 (github.com)
(ob11原文档官方抽象了post_type和post_type的子类型,即第一层和第二层。至于第三层则是ob11适配器自己搞的抽象)
在类的初始化阶段,我们完成了event_models的初始化
==这里是Collator类的具体实现==
E = TypeVar("E", bound=Event)
SEPARATOR = "/"
class Collator(Generic[E]):
# 0.1 初始化字典树
def __init__(self, name: str, models: List[Type[E]], keys: Tuple[Union[str, Tuple[str, ...]], ...],):
self.name = name
self.logger = logger_wrapper(self.name)
self.models = models
self.keys = keys
self.tree = StringTrie(separator=SEPARATOR)
self._refresh_tree()
# 0.2 向字典树添加model,被def add_custom_model(cls, *model: Type[Event]) -> None: 调用
def add_model(self, *model: Type[E]):
self.models.extend(model)
self._refresh_tree()
# 2.2 获取一组pydamic model,被def get_event_model(cls, data: Dict[str, Any]) -> Generator[Type[Event], None, None]:调用
def get_model(self, data: Dict[str, Any]) -> List[Type[E]]:
# 2.3.1 从dict(即client上报原始数据)得到key(前缀分隔)
key = self._key_from_dict(data)
# 2.4 从前缀分割形式的key还原出对应的model
>[B] return [model.value for model in self.tree.prefixes(key)][::-1]
"""
>>> key
'/message/group/normal'
>>> [model.value for model in self.tree.prefixes(key)][::-1]
[<class 'nonebot.adapters.onebot.v11.event.GroupMessageEvent'>, <class 'nonebot.adapters.onebot.v11.event.MessageEvent'>, <class 'nonebot.adapters.onebot.v11.event.Event'>]
"""
# 注意到,这里还原的model以继承顺序反向返回
# 这样可以在后面进行fallback的model匹配
# 0.3 刷新字典树
def _refresh_tree(self):
self.tree.clear()
for model in self.models:
# 0.4 从model中paste出这个model对应的前缀分割格式的keys字符串
key = self._key_from_model(model)
if key in self.tree:
self.logger("DEBUG", f'Model for key "{key}" {self.tree[key]} is overridden by {model}',)
# 存入tree
self.tree[key] = model
# 2.3.1 从dict(即client上报原始数据)得到key(前缀分隔)数组
def _key_from_dict(self, data: Dict[str, Any]) -> str:
keys: List[Optional[str]] = []
for key in self.keys:
if isinstance(key, tuple):
fields = list(filter(None, (data.get(k, None) for k in key)))
if len(fields) > 1:
raise ValueError(f"Invalid data with incorrect fields: {fields}")
field = fields[0] if fields else None
else:
field = data.get(key)
>[B] keys.append(field)
"""
>>> keys
['message', 'group', 'normal']
"""
# 2.3.2 从keys数组得到前缀分割格式的keys字符串
return self._generate_key(keys)
# 0.4 从model中paste出这个model对应的前缀分割格式的keys字符串
def _key_from_model(self, model: Type[E]) -> str:
keys: List[Optional[str]] = []
# 在这里,self.keys就是固定的
# 从传入的eventModel (model)中提取出前缀分割格式的keys字符串
for key in self.keys:
# 一堆子类型中("message_type", "notice_type", "request_type", "meta_event_type")只能有一个作为真正的field
if isinstance(key, tuple):
fields = list(
# 0.5 从model(具体的event类)和 k(类型key)获得fields
filter(None, (self._get_model_field(model, k) for k in key))
)
if len(fields) > 1:
raise ValueError(f"Invalid model with incorrect fields: {fields}")
field = fields[0] if fields else None
else:
# 0.5 从model(具体的event类)和 key(类型key)获得fields
field = self._get_model_field(model, key)
# 0.6 从fields获取key(类型val)
# 似乎 `(field and self._get_literal_field_default(field))` 处的意图纯粹是:
# 1. 若field不为None,从field中解析相应的key(取and后面)
# 2. 若field为None,则key为None(取and前面)
keys.append(field and self._get_literal_field_default(field))
return self._generate_key(keys)
"""
下面是我的self.keys:
(
"post_type", # 第一层前缀: 所有事件中一定有的post_type
("message_type", "notice_type", "request_type", "meta_event_type"), # 第二层前缀: 所有事件中一定有的post_type的子类型
"sub_type", # 第三层前缀: 部分事件中有的sub_type (事件子类型)
),
我传入的model: <class 'nonebot.adapters.onebot.v11.event.FriendRecallNoticeEvent'>
结束后,得到的keys数组:['notice', 'friend_recall', None]
可以看出,我根据我的self.keys,从model中paste出对应的keys(就是self.keys下的value)
对应关系如下:
"post_type" -> notice
("message_type", "notice_type", "request_type", "meta_event_type") -> friend_recall
"sub_type" -> None
"""
# 2.3.2 从keys数组得到前缀分割格式的keys字符串
def _generate_key(self, keys: List[Optional[str]]) -> str:
# 2.3.3 检查 keys 列表是否在第一个 None 或空字符串出现后,其余都是 None 或空字符串
# 确保能得到有效的keys字符串
# 例如 [None, 1, None] -> raise error
if not self._check_key_list(keys):
raise ValueError(
"Invalid model with incorrect prefix "
f"keys: {dict(zip(self.keys, keys))}"
)
tree_keys = [""] + list(filter(None, keys))
>[B] return SEPARATOR.join(tree_keys)
"""
>>> SEPARATOR.join(tree_keys)
'/message/group/normal'
"""
# 2.3.3 检查 keys 列表是否在第一个 None 或空字符串出现后,其余都是 None 或空字符串
def _check_key_list(self, keys: List[Optional[str]]) -> bool:
truthy = tuple(map(bool, keys))
return all(truthy) or not any(truthy[truthy.index(False) :])
# 0.5 从model(具体的event类)和 key(前缀)获得fields
def _get_model_field(self, model: Type[E], field: str) -> Optional[ModelField]:
>[B] return next((f for f in model_fields(model) if f.name == field), None)
"""
>>> model
<class 'nonebot.adapters.onebot.v11.event.FriendRecallNoticeEvent'>
>>> field
'post_type'
>>> model_fields(model)
[ModelField(name='time', annotation=<class 'int'>, field_info=FieldInfo(annotation=int, required=True)), ModelField(name='self_id', annotation=<class 'int'>, field_info=FieldInfo(annotation=int, required=True)), ModelField(name='post_type', annotation=typing.Literal['notice'], field_info=FieldInfo(annotation=Literal['notice'], required=True)), ModelField(name='notice_type', annotation=typing.Literal['friend_recall'], field_info=FieldInfo(annotation=Literal['friend_recall'], required=True)), ModelField(name='user_id', annotation=<class 'int'>, field_info=FieldInfo(annotation=int, required=True)), ModelField(name='message_id', annotation=<class 'int'>, field_info=FieldInfo(annotation=int, required=True))]
>>> [f.name for f in model_fields(model)]
['time', 'self_id', 'post_type', 'notice_type', 'user_id', 'message_id']
>>> next((f for f in model_fields(model) if f.name == field), None)
ModelField(name='post_type', annotation=typing.Literal['notice'],field_info=FieldInfo(annotation=Literal['notice'], required=True))
"""
def _get_literal_field_default(self, field: ModelField) -> Optional[str]:
if not origin_is_literal(get_origin(field.annotation)):
return
allowed_values = all_literal_values(field.annotation)
if len(allowed_values) > 1:
return
return allowed_values[0]
def _key_from_model(self, model: Type[E]) -> str:
部分结合着上面函数的注释+这里看下:
在接收到client的消息后,使用字典树不断地fallback进行匹配,最终将原始信息paste成对应的pydamic中的model
@classmethod
def json_to_event(cls, json_data: Any) -> Optional[Event]:
"""将 json 数据转换为 Event 对象。
如果为 API 调用返回数据且提供了 Event 对应 Bot,则将数据存入 ResultStore。
参数:
json_data: json 数据
self_id: 当前 Event 对应的 Bot
返回:
Event 对象,如果解析失败或为 API 调用返回数据,则返回 None
"""
>[B] if not isinstance(json_data, dict):
return None
"""
>>> json_data
{'self_id': 0, 'user_id': 1, 'time': 1717886097, 'message_id': -2147212788, 'real_id': -2147212788, 'message_type': 'group', 'sender': {'user_id': 2, 'nickname': 'mygo', 'card': '', 'role': 'member'}, 'raw_message': '玩原神玩的', 'font': 14, 'sub_type': 'normal', 'message': [{'data': {'text': '玩原神玩的'}, 'type': 'text'}], 'message_format': 'array', 'post_type': 'message', 'group_id': 948186115}
"""
# 0. 这是一个全新的event(连post_type都没有,参照上文及ob11标准,post_type是ob11事件中必要元素)
if "post_type" not in json_data:
cls._result_store.add_result(json_data) # 向result store存储这个result
return # 直接返回?
try:
# 1. 调用 cls.get_event_model -> cls.event_models.get_model 进行优先匹配
for model in cls.get_event_model(json_data):
try:
# 3. 如果pydamic模型可以成功解析,直接break,得到了最佳解析模型
# 如果pydamic模型不可以解析,fallback到其上一层
>[B] event = type_validate_python(model, json_data)
break
"""
>>> type_validate_python(model, json_data)
GroupMessageEvent(time=1717886097, self_id=0, post_type='message', sub_type='normal', user_id=1, message_type='group', message_id=-2147212788, message=[MessageSegment(type='text', data={'text': '玩原神玩的'})], original_message=[MessageSegment(type='text', data={'text': '玩原神玩的'})], raw_message='玩原神玩的', font=14, sender=Sender(user_id=2, nickname='mygo', sex=None, age=None, card='', area=None, level=None, role='member', title=None), to_me=False, reply=None, group_id=3, anonymous=None, real_id=-2147212788, message_format='array')
"""
except Exception as e:
log("DEBUG", "Event Parser Error", e)
else:
# 最终的fallback,直接回退到Event基类
event = type_validate_python(Event, json_data)
return event
except Exception as e:
log(
"ERROR",
"<r><bg #f8bbd0>Failed to parse event. "
f"Raw: {escape_tag(str(json_data))}</bg #f8bbd0></r>",
e,
)
# 没人调这个函数?
@classmethod
def add_custom_model(cls, *model: Type[Event]) -> None:
"""插入或覆盖一个自定义的 Event 类型。
参数:
model: 自定义的 Event 类型
"""
cls.event_models.add_model(*model)
# 2.1 惰性迭代获取到的这组pydamic model
@classmethod
def get_event_model(cls, data: Dict[str, Any]) -> Generator[Type[Event], None, None]:
"""根据事件获取对应 `Event Model` 及 `FallBack Event Model` 列表。"""
yield from cls.event_models.get_model(data)
对于整个(ob11)Event解析的设计,我倒没什么好说的,因为
我太菜了,我能看懂就已经很不错了
adapter中的 _call_api
是 adapter, bot, 乃至matcher(通过调用bot.send) 向协议端请求的唯一函数
由于ob11实现规定,每次api调用都有一个指定的echo字段,所以我们需要使用_result_store
管理每次请求
ref: onebot-11/communication/ws.md at master · botuniverse/onebot-11 (github.com)
@override
async def _call_api(self, bot: Bot, api: str, **data: Any) -> Any:
websocket = self.connections.get(bot.self_id, None)
timeout: float = data.get("_timeout", self.config.api_timeout)
log("DEBUG", f"Calling API <y>{api}</y>")
if websocket:
seq = self._result_store.get_seq() # 每get_seq()一次,自增一次获取一个新的seq
print(seq)
json_data = json.dumps(
{"action": api, "params": data, "echo": str(seq)},
cls=DataclassEncoder,
)
await websocket.send(json_data)
try:
# 从_result_store.fetch中从这个seq中等待结果
# 这个结果应该是在json_to_event中添加的,不过为什么呢
return handle_api_result(await self._result_store.fetch(seq, timeout))
except asyncio.TimeoutError:
raise NetworkError(f"WebSocket call api {api} timeout") from None
elif isinstance(self.driver, HTTPClientMixin):
...
@classmethod
def json_to_event(cls, json_data: Any) -> Optional[Event]:
"""将 json 数据转换为 Event 对象。
如果为 API 调用返回数据且提供了 Event 对应 Bot,则将数据存入 ResultStore。
参数:
json_data: json 数据
self_id: 当前 Event 对应的 Bot
返回:
Event 对象,如果解析失败或为 API 调用返回数据,则返回 None
"""
if not isinstance(json_data, dict):
return None
if "post_type" not in json_data:
cls._result_store.add_result(json_data)
return
class ResultStore:
def __init__(self) -> None:
self._seq: int = 1
self._futures: Dict[int, asyncio.Future] = {}
@property
def current_seq(self) -> int:
return self._seq
def get_seq(self) -> int:
s = self._seq
self._seq = (self._seq + 1) % sys.maxsize
return s
def add_result(self, result: Dict[str, Any]):
echo = result.get("echo")
if isinstance(echo, str) and echo.isdecimal():
if future := self._futures.get(int(echo)):
future.set_result(result)
async def fetch(self, seq: int, timeout: Optional[float]) -> Dict[str, Any]:
future = asyncio.get_event_loop().create_future()
self._futures[seq] = future
try:
return await asyncio.wait_for(future, timeout)
finally:
del self._futures[seq]
这样看不算直观,我们可以把websocket中send和receive分别打上log,可以看到这一过程中的交互:
06-08 14:41:10 [TRACE] nonebot | OneBot V11 | Sending data to WebSocket: {"action": "get_msg", "params": {"message_id": -2147256444}, "echo": "1"}
06-08 14:41:10 [TRACE] nonebot | OneBot V11 | Received data from WebSocket: {'status': 'ok', 'retcode': 0, 'data': {'self_id': 114514, 'user_id': 1919810, 'time': 1717827867, 'message_id': -2147256444, 'real_id': -2147256444, 'message_type': 'group', 'sender': {'user_id': 1919810, 'nickname': 'xianbei', 'card': '', 'role': 'member'}, 'raw_message': '[CQ:at,qq=12345] 来玩archlinux', 'font': 14, 'sub_type': 'normal', 'message': [{'data': {'qq': '12345'}, 'type': 'at'}, {'data': {'text': ' 来玩archlinux'}, 'type': 'text'}], 'message_format': 'array', 'post_type': 'message', 'group_id': 12345678}, 'message': '', 'wording': '', 'echo': '1'}
整个流程大致是:
-
Bot端调用
_call_api
-
get_seq
生成一个seq,作为发送客户端data的echo号 -
发送给客户端
-
return handle_api_result(await self._result_store.fetch(seq, timeout))
其中
async def fetch(self, seq: int, timeout: Optional[float]) -> Dict[str, Any]: future = asyncio.get_event_loop().create_future() self._futures[seq] = future try: return await asyncio.wait_for(future, timeout) finally: del self._futures[seq]
会等待这个seq对应的消息被传入(见函数
add_result
)后,且等待时间为timeout
时长 -
监听客户端ws的地方在收到(可能是这个echo对应的message后)交由
json_to_event
,如果这个message恰好是回复_call_api
的(即"post_type" not in json_data:
),那么我就会cls._result_store.add_result(json_data)
-
add_result
之后(或等待Timeout时长后)协程等待结束,整个流程结束,还会del self._futures[seq]
同理,HTTP的调用(_call_api
)流程类似于ws。
Note
本部分内容有可能会被拆分到新的章节
这部分源于在看ob11的Bot部分时,往前翻了下blame,然后发现了v2.0.0a16的Nonebot和现在还不一样,便兴起而写
为什么要和v2.0.0a16相比?v2.0.016恰好是ob11适配器(原cqhttp适配器)从Nonebot2仓库分离后的第一次提交(ref: adapter-onebot/nonebot/adapters/onebot/v11/bot.py at 12bf9bc11669bd09c08500f2440fe4ee26d4a95b · nonebot/adapter-onebot (github.com),对应的Nonebot最近的stable版本号
相较于现在(v2.3.1,本节下略)的Nonebot,意外的发现了v2.0.0a16的官方文档还介绍了Nonebot的部分细节 -> 深入 | NoneBot (deploy-preview-588--nonebot2.netlify.app) (当然这都是题外话了
于是便看了下v2.0.0a16的Nonebot文档,相较于现在的Nonebot,整体处理逻辑看上去并无大改动,但是有些改变的细节值得一提。
我们不难注意到,v2.0.0a16的Adapter的签名是这样的:(ref: https://github.com/nonebot/nonebot2/blob/v2.0.0a16/nonebot/drivers/__init__.py#L28)
_adapters: Dict[str, Type["Bot"]] = {}
我们再来看看v2.0.0a16的Driver:
可以看到,这个时候的Driver还不能实现插拔自由,每一个子Driver完整的继承基类Driver
从v2.0.0a16的Driver到现在,Driver, Adapter, Bot进一步实现解耦,同时Driver的功能性得到了大幅提升
-
v2.0.0a16的Driver不仅是Driver容器,而且还负责管理具体的连接以及管理Bot实例,而现在的Driver仅仅是一个抽象的连接层,只负责对Adapter提供连接的接口,供Adapter使用。管理Bot实例的任务则交给了Adapter,这使得你可以在
-
v2.0.0a16的Driver只抽象出了正向ws,反向ws以及HTTP轮询(注意,是http轮询,看来彼时的nb也很好的贴合了ob11(x )三种方式,对于webhook等连接还要在适配器里额外引入httpx,
(而且还是无session复用的httpx)ref:
因为Bot是与Adapter所耦合的,接下来,我们就看看Bot部分
在基类Bot中,传入adapter进行初始化。Bot具有自己的on_calling_api和on_called_api钩子函数,同时具有call_api和send两个自身方法
call_api 函数具有两个钩子函数(on_calling_api和on_called_api),然后调用其加载的adapter中的_call_api函数,具体实现如下:
==待补全:所有钩子函数的具体实现,看来钩子函数也不是那样简单的==
注意到,_calling_api_hook
和_called_api_hook
的类型注解表明他们只能进行有限的依赖注入,参见:钩子函数 #平台接口调用钩子
这个有限的依赖注入指:
-
传参必须完整(例如T_CallingAPIHook比如同时传入
bot: Bot, api: str, data: Dict[str, Any]
)相较于使用依赖注入容器的
_DependentCallable
,_calling_api_hook
和_called_api_hook
的传参必须完整才可以使用06-10 14:55:54 [WARNING] nonebot | OneBot V11 | Error when getting message reply info: TypeError('handle_api_call_2() takes 1 positional argument but 3 were given')
下面是具体的实现:
# api hooks
T_CallingAPIHook: TypeAlias = t.Callable[
["Bot", str, dict[str, t.Any]], t.Awaitable[t.Any]
]
T_CalledAPIHook: TypeAlias = t.Callable[
["Bot", t.Optional[Exception], str, dict[str, t.Any], t.Any], t.Awaitable[t.Any]
]
class Bot(abc.ABC):
# 注意这里的类型注解
_calling_api_hook: ClassVar[set[T_CallingAPIHook]] = set()
_called_api_hook: ClassVar[set[T_CalledAPIHook]] = set()
async def call_api(self, api: str, **data: Any) -> Any:
result: Any = None
skip_calling_api: bool = False
exception: Optional[Exception] = None
if coros := [hook(self, api, data) for hook in self._calling_api_hook]:
try:
logger.debug("Running CallingAPI hooks...")
await asyncio.gather(*coros)
except MockApiException as e:
skip_calling_api = True
result = e.result
logger.debug(
f"Calling API {api} is cancelled. Return {result} instead."
)
except Exception as e:
logger.opt(colors=True, exception=e).error(
"<r><bg #f8bbd0>Error when running CallingAPI hook. "
"Running cancelled!</bg #f8bbd0></r>"
)
if not skip_calling_api:
try:
result = await self.adapter._call_api(self, api, **data)
except Exception as e:
exception = e
if coros := [
hook(self, exception, api, data, result) for hook in self._called_api_hook
]:
try:
logger.debug("Running CalledAPI hooks...")
await asyncio.gather(*coros)
except MockApiException as e:
# mock api result
result = e.result
# ignore exception
exception = None
logger.debug(
f"Calling API {api} result is mocked. Return {result} instead."
)
except Exception as e:
logger.opt(colors=True, exception=e).error(
"<r><bg #f8bbd0>Error when running CalledAPI hook. "
"Running cancelled!</bg #f8bbd0></r>"
)
if exception:
raise exception
return result
具体的实现倒没有什么特别的地方,(相较于带高级依赖注入的Driver来讲)比较常规
ref: 钩子函数#平台接口调用钩子
先看下这两个函数的接口:
async def call_api(self, api: str, **data: Any) -> Any:
...
@abc.abstractmethod
async def send(self, event: "Event", message: Union[str, "Message", "MessageSegment"], **kwargs: Any,) -> Any:
...
-
call_api只需要指定api和data就好了,而send需要考虑的就多了。send需要指定event和message segment,而call_api就很简单,直接发送裸数据
-
call_api有钩子函数,而send没有
-
call_api 使用
__getattr__
的方式被注入在了Bot类中:class _ApiCall(Protocol): async def __call__(self, **kwargs: Any) -> Any: ... def __getattr__(self, name: str) -> "_ApiCall": if name.startswith("__") and name.endswith("__"): raise AttributeError( f"'{self.__class__.__name__}' object has no attribute '{name}'" ) return partial(self.call_api, name)
-
send 会被在Matcher中使用(诸如send, finish, reject)
-
在ob11实现中,send实际上调用了call_api ,而官方文档也是这样写的(send接口实际应该调用call_api)
ref: 使用平台接口 | NoneBot
Note
TODO