Last active
August 18, 2018 09:39
-
-
Save Sets88/869326a427fe99528b3eaf943e53598d to your computer and use it in GitHub Desktop.
Telegram Client example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from telethon import TelegramClient | |
from telethon.tl.types import MessageActionChatAddUser | |
from telethon.tl.functions.messages import GetAllStickersRequest | |
from telethon.tl.functions.messages import GetStickerSetRequest | |
from telethon.tl.types import InputStickerSetID | |
from operator import attrgetter | |
from time import sleep | |
import traceback | |
from datetime import datetime | |
API_ID = os.environ['TG_API_ID'] | |
API_HASH = os.environ['TG_API_HASH'] | |
PHONE = os.environ['TG_PHONE'] | |
USERNAME = os.environ['TG_USERNAME'] | |
ACCESS_HASH = int(os.environ.get('TG_ACCESS_HASH', '0')) | |
class InteruptActions(Exception): | |
"""!Exception which used to break running actions""" | |
class SuspiciousUsers(): | |
"""!Class which contains list of suspicious users and methods to work with this list""" | |
def __init__(self, telegramapp): | |
self.app = telegramapp | |
self.users = [] | |
def joined(self, user_id): | |
"""!Adding just logged in users in the list of suspicious users if its not in list yet | |
@param user_id ID of user which have to be added in the list""" | |
if user_id not in self.users: | |
self.users.append(user_id) | |
def in_list(self, user_id): | |
"""!Checks if user in the list | |
@param user_id ID of user which have to checked if its in list | |
@return True is user is in the list""" | |
return user_id in self.users | |
def remove(self, user_id): | |
"""!If user is in the list will remove user from it | |
@param user_id ID of user which have to be removed from list""" | |
if user_id in self.users: | |
self.users.remove(user_id) | |
class Action(): | |
"""!Base class for all of actions which have to be applied to every received message""" | |
rank = 0 | |
def __init__(self, telegramapp): | |
self.app = telegramapp | |
def log(self, *args, **kwargs): | |
"""!Log message using main app class log method""" | |
self.app.log(*args, **kwargs) | |
def matching(self, group, message): | |
"""!Checks if message should be proccessed by current class object | |
@param group Group where message was received | |
@param message Message recived in group""" | |
for item in filter(lambda i: i.startswith('is') and callable(getattr(self, i)), dir(self)): | |
if getattr(self, item)(group, message): | |
return True | |
def action(self, group, message): | |
"""!Hook which appies actions as a reaction on message | |
@param group Group where message was received | |
@param message Message recived in group""" | |
pass | |
def process(self, group, message): | |
"""!Checks if action should be applied and apply action | |
@param group Group where message was received | |
@param message Message recived in group""" | |
if self.matching(group, message): | |
self.action(group, message) | |
class KickingAction(Action): | |
"""!Class which helps to send to banbot potentially bad users""" | |
rank = 1 | |
def is_too_long_named(self, group, message): | |
"""!Checks if full name of just joined user is longer then 150 | |
@param group Group where message was received | |
@param message Message recived in group | |
@return True if if full name of just joined user is longer then 150""" | |
if isinstance(message.action, MessageActionChatAddUser): | |
if len(message.sender.first_name or []) + len(message.sender.last_name or []) > 150: | |
return True | |
def is_just_joined(self, group, message): | |
"""!Used to add every newly joined users into suspicious users list | |
@param group Group where message was received | |
@param message Message recived in group""" | |
if isinstance(message.action, MessageActionChatAddUser): | |
self.app.suspicious_users.joined(message.sender.id) | |
def is_spammer(self, group, message): | |
"""!Checks if user is potential spammer if he recently joined and first his message been forwarded | |
@param group Group where message was received | |
@param message Message recived in group | |
@return True user is a potential spammer""" | |
if hasattr(message, 'fwd_from') and message.fwd_from is not None: | |
if self.app.suspicious_users.in_list(message.sender.id): | |
return True | |
def is_normal_message(self, group, message): | |
"""!If message not forwarded and not a user join message remove user from suspicious users list | |
@param group Group where message was received | |
@param message Message recived in group""" | |
if hasattr(message, 'fwd_from') and message.fwd_from is not None: | |
return | |
if isinstance(message.action, MessageActionChatAddUser): | |
return | |
self.app.suspicious_users.remove(message.sender.id) | |
def action(self, group, message): | |
"""!Kick user and remove user id from list of suspicious users | |
@param group Group where message was received | |
@param message Message recived in group""" | |
self.log('kickin %s' % message.sender.first_name) | |
self.app.client.send_message(group, '@banofbot', reply_to=message.id) | |
self.app.suspicious_users.remove(message.sender.id) | |
raise InteruptActions | |
class GreetAction(Action): | |
"""!Class which greets every joined user with a sticker""" | |
rank = 2 | |
def __init__(self, *args, **kwargs): | |
super(GreetAction, self).__init__(*args, **kwargs) | |
iset_id = InputStickerSetID(id=1681813578551656450, access_hash=ACCESS_HASH) | |
stick_req = GetStickerSetRequest(stickerset=iset_id) | |
self.sticker = [x for x in self.app.client(stick_req).documents if x.id == 1681813578551656492][0] | |
def is_just_joined(self, group, message): | |
"""!Used to add every newly joined users into suspicious users list | |
@param group Group where message was received | |
@param message Message recived in group""" | |
if isinstance(message.action, MessageActionChatAddUser): | |
return True | |
def action(self, group, message): | |
"""!Greet user if he just joined | |
@param group Group where message was received | |
@param message Message recived in group""" | |
self.log("Sending in reply to %s" % message.id) | |
self.app.client.send_file(group, self.sticker, reply_to=message.id) | |
class TelegramApp(): | |
"""!Main app class which processing new messages and applies actions to it""" | |
def __init__(self): | |
self.client = TelegramClient(USERNAME, API_ID, API_HASH) | |
self.actions = [] | |
self.suspicious_users = SuspiciousUsers(self) | |
self.client.connect() | |
if not self.client.is_user_authorized(): | |
self.client.send_code_request(PHONE) | |
self.client.sign_in(PHONE, input('Enter the code: ')) | |
self.load_actions() | |
def load_actions(self): | |
"""!Finds all subclasses of Action class, initializes it and adds it to the list of actions""" | |
for AClass in sorted([x for x in Action.__subclasses__()], key=attrgetter('rank')): | |
self.actions.append(AClass(self)) | |
def get_dialog_by_name(self, name): | |
"""!Get all dialogs of current user and finds requested dialog by name | |
@param name Title of dialog to find by | |
@return dialog object if name matched""" | |
for dialog in self.client.get_dialogs(): | |
if dialog.name == name: | |
return dialog | |
def log(self, message): | |
"""!Logs events with timestamp to STDOUT | |
@param message Text of message which should be into log""" | |
print("%s %s" % (datetime.now().isoformat(), message)) | |
def get_messages(self, group, **kwargs): | |
"""!Gets messages list | |
@param group Group object which will be to take messages from | |
@return list of messages""" | |
messages = self.client.get_messages(group, **kwargs) | |
return messages | |
def get_last_id(self, messages, last_id=None): | |
"""!Returns last message id in the list of messages if its not empty | |
@param list of messages from which last message id should be taken | |
@return ID of last message, but if its empty returns last_id parameter""" | |
if not messages: | |
return last_id | |
else: | |
return messages[0].id | |
def apply_actions(self, group, message): | |
"""!Applies all actions to message | |
@param group Group where message was received | |
@param message Message recived in group""" | |
for action in self.actions: | |
try: | |
action.process(group, message) | |
except InteruptActions: | |
break | |
def run(self): | |
"""!Gets Belgorod IT dialog starting to monitor every second for new messages and applies all actions to it""" | |
belgorod_it = self.get_dialog_by_name('Belgorod IT') | |
last_id = self.get_last_id(self.get_messages(belgorod_it, limit=1)) | |
while True: | |
messages = self.get_messages(belgorod_it, min_id=last_id, limit=100) | |
for message in messages: | |
if message.id != last_id: | |
self.apply_actions(belgorod_it, message) | |
last_id = self.get_last_id(messages, last_id=last_id) | |
sleep(1) | |
while True: | |
print("Starting script") | |
try: | |
app = TelegramApp() | |
app.run() | |
except Exception: | |
print(traceback.format_exc()) | |
sleep(10) | |
print("Restarting script") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment