Skip to content

Instantly share code, notes, and snippets.

@mhz-tamb
Created January 9, 2017 07:29
Show Gist options
  • Save mhz-tamb/f54817c2536b86529574d8ed60b4e2fb to your computer and use it in GitHub Desktop.
Save mhz-tamb/f54817c2536b86529574d8ed60b4e2fb to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
from bs4 import BeautifulSoup
from imaplib import Commands, IMAP4
import argparse
import configparser
import email
import urllib.request
import urllib.parse
import time
import datetime
Commands['IDLE'] = ('AUTH', 'SELECTED')
Commands['DONE'] = ('IDLE')
class IMAP(IMAP4):
__activityTime = 0
__idleMode = False
__idleTag = None
def __updateActivityTime(self):
self.__activityTime = time.time()
def idleBegin(self):
assert(not self.__idleMode)
if not "IDLE" in self.capabilities:
raise self.error("Server does not support IDLE command.")
self.__idleTag = super()._command("IDLE")
super()._get_response()
self.__updateActivityTime()
self.__idleMode = True
def idleDone(self):
assert(self.__idleMode)
super().send('DONE\r\n'.encode())
result ,data = self._command_complete("IDLE", self.__idleTag)
self.__idleMode = False
return result
def idle(self, callback, timeout = 29 * 60):
self.idleBegin()
while True:
response = super().readline().strip()
if response.startswith('* BYE '.encode()) or (len(response) == 0):
return False
if response.endswith('EXISTS'.encode()) or timeout >= time.time() - self.__activityTime:
self.idleDone()
callback()
self.noop()
self.idleBegin()
time.sleep(1)
self.idleDone()
parser = argparse.ArgumentParser()
parser.add_argument('-d', '--daemonize', action = 'store_true', help = 'daemonize application')
parser.add_argument('-c', '--config', default = 'config.ini', help = 'path to config file (default config.ini)')
args = parser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
imap = IMAP(config['imap']['host'])
imap.starttls()
imap.login(config['imap']['user'], config['imap']['password'])
imap.select(config['imap']['folder'], readonly = True)
last_uid = 0
def checkMail():
print('test ' + str(time.time()))
result, data = imap.uid('search', None, '(FROM "{1}" SENTSINCE {2})'.format(
last_uid,
config['imap']['from'],
datetime.date.today().strftime('%d-%b-%Y')
))
for msg_uid in data[0].split():
if int(msg_uid) > last_uid:
result, data = imap.uid('fetch', msg_uid, '(RFC822)')
body = ''
raw = email.message_from_string(data[0][1].decode('utf-8'))
for part in raw.walk():
if (part.is_multipart()):
for payload in part.get_payload():
body + payload.get_payload(decode=True).decode()
else:
body = part.get_payload(decode=True).decode()
body = BeautifulSoup(body).get_text().strip()
print(body)
if len(body) > 0:
data = {
'chat_id': config['telegram']['chat_id'],
'text': body.strip(),
'parse_mode': 'HTML',
'disable_web_page_preview': False
}
response = urllib.request.urlopen(
'https://api.telegram.org/bot' + config['telegram']['token'] + '/sendMessage',
urllib.parse.urlencode(data).encode('utf-8')
)
imap.idle(checkMail, int(config['imap']['timeout']) * 60)
imap.close()
imap.logout()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment