Skip to content

Instantly share code, notes, and snippets.

@yihong0618
Created November 26, 2021 04:24
Show Gist options
  • Save yihong0618/a1f904b3fdbdebaabafa04b5cf7551a8 to your computer and use it in GitHub Desktop.
Save yihong0618/a1f904b3fdbdebaabafa04b5cf7551a8 to your computer and use it in GitHub Desktop.
pythonhunter
from telethon import TelegramClient, sync
from telethon.tl.functions.channels import GetParticipantsRequest
from telethon.tl.functions.messages import GetHistoryRequest

from PIL import Image
from wordcloud import WordCloud, STOPWORDS
import matplotlib.pyplot as plt
import numpy as np

import argparse
from getpass import getpass

API_ID ="xxxx"
API_HASH = "xxxxx"
PHONE_NUM = "xxxxxx"


def connect_telegram(api_id, api_hash, phone_number):
    print("Trying to connect to Telegram...")
    client = TelegramClient("Session", api_id, api_hash)
    if not client.start():
        print("Could not connect to Telegram servers.")
        return None
    else:
        if not client.is_user_authorized():
            print(
                "Session file not found. This is the first run, sending code request..."
            )
            client.sign_in(phone_number)
            self_user = None
            while self_user is None:
                code = input("Enter the code you just received: ")
                try:
                    self_user = client.sign_in(code=code)
                except Exception as e:
                    print(f"Maybe 2FA error for {str(e)}")
                    pw = getpass(
                        "Two step verification is enabled. Please enter your password: "
                    )
                    self_user = client.sign_in(password=pw)
                    raise Exception("Please check your password or 2FA")
    return client


def get_history_messaes_list(client, chat_info):
    offset_id = 0
    limit = 100
    message_list = []

    while 1:
        posts = client(
            GetHistoryRequest(
                peer=chat_info,
                limit=100,
                offset_date=None,
                offset_id=offset_id,
                max_id=0,
                min_id=0,
                add_offset=0,
                hash=0,
            )
        )
        if not posts.messages or len(posts.messages) < limit:
            break
        messages = posts.messages
        offset_id = messages[-1].id
        print(offset_id)
        for message in messages:
            if message.message:
                message_list.append(message.message)
        # for test
    return message_list


def generate_word_cloud(text):
    wc =  WordCloud(background_color="white",
           max_words=2000,
           height=400,
           width=800,
           font_path="hei.ttf",
           max_font_size=50,)
    wc.generate(text)
    wc.to_file("test3.png")



def main():
    client = connect_telegram(API_ID, API_HASH, PHONE_NUM)
    if not client:
        raise Exception("Can not auth telegram")
    print("Getting chat basic info...")
    # uncomments these print line to get the channel or group id
    for d in client.iter_dialogs():
        # print(d.name, d.id)
        pass

    # id 1001492060815 is pythonhunter
    chat_info = client.get_entity(1001492060815)
    message_list = get_history_messaes_list(client, chat_info)
    text = "\n".join(message_list)
    print(len(text))
    generate_word_cloud(text)
    return text


if __name__ == "__main__":
    main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment