|
# coding: utf-8 |
|
import io |
|
import os |
|
import re |
|
import logging |
|
import string |
|
from urllib.parse import quote |
|
import urllib.request |
|
from tempfile import NamedTemporaryFile |
|
from typing import Optional |
|
from ruamel.yaml import YAML |
|
|
|
from ehforwarderbot import EFBMiddleware, EFBMsg, EFBStatus, \ |
|
EFBChat, coordinator, EFBChannel, utils |
|
from ehforwarderbot.message import EFBMsgSubstitutions |
|
# from link_preview import link_preview |
|
|
|
class NoticeMiddleware(EFBMiddleware): |
|
""" |
|
EFB Middleware - NoticeMiddleware |
|
An extension for link preview. |
|
""" |
|
|
|
middleware_id = "notice.NoticeMiddleware" |
|
middleware_name = "Notice Middleware" |
|
__version__: str = '1.0.0' |
|
|
|
logger: logging.Logger = logging.getLogger("plugins.%s" % middleware_id) |
|
|
|
|
|
def __init__(self, instance_id=None): |
|
super().__init__() |
|
|
|
if hasattr(coordinator, "master") and isinstance(coordinator.master, EFBChannel): |
|
self.admin = coordinator.master.config['admins'][0] |
|
|
|
if hasattr(coordinator, "slaves") and coordinator.slaves['blueset.wechat']: |
|
self.channel_ews = coordinator.slaves['blueset.wechat'] |
|
|
|
self.notices_pattern = None |
|
self.tags_pattern = None |
|
self.load_config() |
|
|
|
def load_config(self): |
|
""" |
|
Load configuration from path specified by the framework. |
|
|
|
Configuration file is in YAML format. |
|
""" |
|
config_path = utils.get_config_path(self.middleware_id) |
|
if not config_path.exists(): |
|
return |
|
|
|
with config_path.open() as f: |
|
data = YAML().load(f) |
|
|
|
# Verify configuration |
|
notices = data.get("notices", []) |
|
if notices and not isinstance(notices, list): |
|
raise ValueError("notices are expected to be a list, but {} is found.".format(notices)) |
|
if len(notices) > 0: |
|
self.notices_pattern = re.compile('|'.join(notices)) |
|
|
|
tags = data.get("tags", []) |
|
if tags and not isinstance(tags, list): |
|
raise ValueError("tags are expected to be a list, but {} is found.".format(tags)) |
|
if len(tags) > 0: |
|
self.tagMap = {} |
|
_tag = [] |
|
for tag in tags: |
|
t = tag.split(':', 1) |
|
if len(t) == 2: |
|
self.tagMap[t[0]] = t[1] |
|
_tag.append(t[0]) |
|
|
|
self.tags_pattern = re.compile('|'.join(_tag)) |
|
|
|
def sent_by_master(self, message: EFBMsg) -> bool: |
|
author = message.author |
|
if author and author.module_id and author.module_id == 'blueset.telegram': |
|
return True |
|
else: |
|
return False |
|
|
|
def process_message(self, message: EFBMsg) -> Optional[EFBMsg]: |
|
""" |
|
Process a message with middleware |
|
Args: |
|
message (:obj:`.EFBMsg`): Message object to process |
|
Returns: |
|
Optional[:obj:`.EFBMsg`]: Processed message or None if discarded. |
|
""" |
|
|
|
|
|
# self.logger.log(99, "Received message from NoticeMiddleware: %s", message.__dict__) |
|
|
|
if self.sent_by_master(message): |
|
return message |
|
|
|
# 去重 |
|
tags = {} |
|
|
|
attributes = message.attributes |
|
if getattr(attributes, 'title', None) or getattr(attributes, 'description', None): |
|
search_text = f"{getattr(attributes, 'title', '')}{getattr(attributes, 'description', '')}" |
|
if self.notices_pattern: |
|
# self.logger.log(99, "Received message from NoticeMiddleware: %s", attributes.__dict__) |
|
result = self.notices_pattern.findall(search_text) |
|
if len(result) > 0: |
|
attributes.image = 'tg://user?id=%s' % self.admin |
|
# self.logger.log(99, "Received message from NoticeMiddleware: %s", message.__repr__) |
|
|
|
if self.tags_pattern: |
|
result = set(self.tags_pattern.findall(search_text)) |
|
if len(result) > 0: |
|
append = '\n\n' |
|
for item in result: |
|
if not tags.get(item): |
|
tag = self.tagMap.get(item, item) |
|
tags[tag] = True |
|
append += '#%s ' % tag |
|
attributes.description = getattr(attributes, 'description', '').strip() + append.strip(' ') |
|
|
|
text = message.text |
|
if text: |
|
if self.tags_pattern: |
|
result = set(self.tags_pattern.findall(text)) |
|
if len(result) > 0: |
|
append = '\n\n' |
|
for item in result: |
|
if not tags.get(item): |
|
tag = self.tagMap.get(item, item) |
|
tags[tag] = True |
|
append += '#%s ' % tag |
|
message.text = message.text.strip() + append.strip(' ') |
|
|
|
if self.notices_pattern: |
|
result = self.notices_pattern.findall(text) |
|
if len(result) > 0: |
|
message.text = '🔊 ' + message.text |
|
message.substitutions = EFBMsgSubstitutions({ |
|
(0, 1): EFBChat(self.channel_ews).self() |
|
}) |
|
|
|
return message |