-
-
Save LinkinAndy/4f987caad86649eae7f8bb3511321685 to your computer and use it in GitHub Desktop.
响应好友请求 / 自动聊天 / 限制频率 / 邀请入群 / 远程群管理 / 新人欢迎消息 / 关键词问答 / 发心跳 / 远程命令 / 远程执行代码
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# coding: utf-8 | |
import datetime | |
import logging | |
import re | |
import sys | |
import time | |
from functools import wraps | |
from pprint import pformat | |
from threading import Thread | |
""" | |
psutil 模块用于监控进程状态,例如内存占用情况,请自行安装: | |
pip3 install -U psutil | |
""" | |
import psutil | |
sys.path.insert(0, '..') | |
from wxpy import * | |
from wxpy.utils.misc import get_text_without_at_bot | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger() | |
# 下面的 console_qr 参数,请自行按需调整 | |
bot = Bot('bot.pkl', console_qr=-2) | |
# 定义远程管理员 (用于远程管理),使用备注名更安全 | |
remote_admin = ensure_one(bot.friends().search(remark_name='youfou')) | |
# 防止登错账号 | |
if bot.self == remote_admin: | |
raise ValueError('Wrong User!') | |
# 使用 wxid 找到需要管理的微信群 | |
bot.groups(True) | |
group_ids = ( | |
# wxpy 交流群 🐰 | |
'6411313640@chatroom', | |
# wxpy 交流群 🐱 | |
'6788356306@chatroom', | |
) | |
wxpy_groups = list() | |
for wxid in group_ids: | |
g = bot.groups().search(wxid=wxid)[0] | |
wxpy_groups.append(g) | |
# 初始化聊天机器人 | |
tuling = Tuling() | |
# 自动回答关键词 | |
kw_replies = { | |
'wxpy 项目主页:\ngithub.com/youfou/wxpy': ( | |
'项目', '主页', '官网', '网站', 'github', '地址', 'repo', '版本' | |
), | |
'wxpy 在线文档:\nwxpy.readthedocs.io': ( | |
'请问', '文档', '帮助', '怎么', '如何', '请教', '安装', '说明' | |
), | |
'必看: 常见问题 FAQ:\nwxpy.readthedocs.io/faq.html': ( | |
'faq', '常见', '问题', '问答', '什么' | |
) | |
} | |
# 新人入群的欢迎语 | |
welcome_text = '''\U0001F389 欢迎 @{} 的加入! | |
\U0001f603 请勿在本群使用机器人 | |
\U0001F4D6 提问前请看 t.cn/R6VkJDy''' | |
# 新人入群通知的匹配正则 | |
rp_new_member_name = ( | |
re.compile(r'^"(.+)"通过'), | |
re.compile(r'邀请"(.+)"加入'), | |
) | |
# 远程踢人命令: @<机器人> 移出 @<需要被移出的人> | |
rp_kick = re.compile(r'^@.+移出\s*@(.+?)(?:\u2005?\s*$)') | |
def update_groups(): | |
for _group in wxpy_groups: | |
_group.update_group(members_details=True) | |
remote_admin.send('{}: {}'.format(_group.name, len(_group))) | |
return True | |
process = psutil.Process() | |
def status_text(): | |
uptime = datetime.datetime.now() - datetime.datetime.fromtimestamp(process.create_time()) | |
memory_usage = process.memory_info().rss | |
return '{uptime}\n{memory}'.format( | |
uptime=str(uptime).split('.')[0], | |
memory='{:.2f} MB'.format(memory_usage / 1024 ** 2) | |
) | |
def send_status_text(): | |
return remote_admin.send(status_text()) | |
# 定时报告进程状态 | |
def heartbeat(): | |
while True: | |
time.sleep(600) | |
# noinspection PyBroadException | |
try: | |
send_status_text() | |
except: | |
logger.exception('failed to report heartbeat:') | |
heartbeat_thread = Thread(target=heartbeat, daemon=True, name='heartbeat') | |
heartbeat_thread.start() | |
def remote_eval(source): | |
try: | |
ret = eval(source, globals()) | |
except (SyntaxError, NameError): | |
return | |
except Exception as e: | |
ret = e | |
remote_admin.send(pformat(ret)) | |
return True | |
# 远程命令 (单独发给机器人的消息) | |
remote_orders = { | |
'groups': update_groups, | |
'status': send_status_text, | |
} | |
# 尝试直接将 msg.text 作为 Python 代码执行 | |
# 若遇语法错误,则按上述字典执行匹配的函数 | |
# 若有执行以上任何内容,则返回 True | |
def exec_remote_order(msg): | |
if msg.sender == remote_admin: | |
order = remote_orders.get(msg.text.lower()) | |
if order: | |
order() | |
return True | |
else: | |
return remote_eval(msg.text) | |
def reply_by_keyword(msg): | |
for reply, keywords in kw_replies.items(): | |
for kw in keywords: | |
if kw in msg.text.lower(): | |
logger.info('reply by keyword: \n{}: "{}"\nreplied: "{}"'.format( | |
(msg.member or msg.chat).name, msg.text, reply)) | |
msg.reply(reply) | |
return reply | |
# 验证入群口令 | |
def valid(msg): | |
return 'wxpy' in msg.text.lower() | |
# 自动选择未满的群 | |
def get_group(): | |
wxpy_groups.sort(key=len, reverse=True) | |
for _group in wxpy_groups: | |
if len(_group) < 490: | |
return _group | |
else: | |
logger.warning('群都满啦!') | |
return wxpy_groups[-1] | |
# 邀请入群 | |
def invite(user): | |
joined = list() | |
for group in wxpy_groups: | |
if user in group: | |
joined.append(group) | |
if joined: | |
joined_nick_names = '\n'.join(map(lambda x: x.nick_name, joined)) | |
logger.info('{} is already in\n{}'.format(user, joined_nick_names)) | |
user.send('你已加入了\n{}'.format(joined_nick_names)) | |
else: | |
group = get_group() | |
logger.info('inviting {} to {}'.format(user, group)) | |
user.send('口令验证通过,开启群入口 [机智]') | |
group.add_members(user, use_invitation=True) | |
# 限制频率: 指定周期内超过消息条数,直接回复"[奸笑]" | |
def freq_limit(period_secs=10, limit_msgs=3): | |
def decorator(func): | |
@wraps(func) | |
def wrapped(msg): | |
now = datetime.datetime.now() | |
period = datetime.timedelta(seconds=period_secs) | |
recent_received = 0 | |
for m in msg.bot.messages[::-1]: | |
if m.sender == msg.sender: | |
if now - m.create_time > period: | |
break | |
recent_received += 1 | |
if recent_received > limit_msgs: | |
if not isinstance(msg.chat, Group) or msg.is_at: | |
return '[奸笑]' | |
return func(msg) | |
return wrapped | |
return decorator | |
def get_new_member_name(msg): | |
for rp in rp_new_member_name: | |
match = rp.search(msg.text) | |
if match: | |
return match.group(1) | |
def remote_kick(msg): | |
if msg.is_at and msg.type is TEXT: | |
match = rp_kick.search(msg.text) | |
if match: | |
if msg.member != remote_admin: | |
raise ValueError('Wrong admin: {}'.format(msg.member)) | |
name_to_kick = match.group(1) | |
try: | |
member_to_kick = ensure_one(msg.chat.search(name_to_kick)) | |
except ValueError as e: | |
logger.exception('remote_kick') | |
remote_admin.send('remote_kick: {}'.format(e)) | |
raise | |
if member_to_kick in (bot.self, remote_admin): | |
remote_admin.send('不能移出 {}'.format(member_to_kick.nick_name)) | |
raise ValueError('Wrong member to kick: {}'.format(member_to_kick)) | |
else: | |
member_to_kick.remove() | |
msg.chat.send('已移出 {}'.format(name_to_kick)) | |
return True | |
def semi_sync(msg, groups): | |
if msg.is_at: | |
msg.text = get_text_without_at_bot(msg) | |
if msg.text: | |
sync_message_in_groups( | |
msg, groups, suffix='↑隔壁消息↑回复请@机器人') | |
# 响应好友请求 | |
@bot.register(msg_types=FRIENDS) | |
def new_friends(msg): | |
logger.info('accepting {}'.format(msg.card)) | |
user = msg.card.accept() | |
if valid(msg): | |
invite(user) | |
else: | |
user.send('Hello {},你忘了填写加群口令,快回去找找口令吧'.format(user.name)) | |
# 响应好友消息,限制频率 | |
@bot.register(Friend, msg_types=TEXT) | |
@freq_limit() | |
def exist_friends(msg): | |
if valid(msg): | |
invite(msg.sender) | |
return | |
elif reply_by_keyword(msg): | |
return | |
tuling.do_reply(msg) | |
@bot.register(remote_admin, msg_types=TEXT) | |
def reply_remote_admin(msg): | |
""" | |
响应远程管理员 | |
内容解析方式优先级: | |
1. 尝试作为远程命令执行 (额外定义,一条命令对应一个函数) | |
2. 若不是远程命令,则尝试作为 Python 代码执行 (可执行大部分 Python 代码) | |
3. 若非可执行的 Python 代码,则作为普通聊天内容回复 | |
""" | |
# 上述的 1. 和 2. | |
if exec_remote_order(msg): | |
return | |
# 上述的 3. | |
return exist_friends(msg) | |
# 在其他群中回复被 @ 的消息,限制频率 | |
@bot.register(Group, TEXT) | |
# @freq_limit() | |
def reply_other_group(msg): | |
if msg.chat not in wxpy_groups and msg.is_at: | |
tuling.do_reply(msg) | |
# wxpy 群的消息处理 | |
@bot.register(wxpy_groups) | |
def wxpy_group(msg): | |
if msg.is_at: | |
if remote_kick(msg): | |
return | |
semi_sync(msg, wxpy_groups) | |
# 新人欢迎消息 | |
@bot.register(wxpy_groups, NOTE) | |
def welcome(msg): | |
name = get_new_member_name(msg) | |
if name: | |
return welcome_text.format(name) | |
# update_groups() | |
remote_admin.send('Bot started!') | |
embed() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment