Skip to content

Instantly share code, notes, and snippets.

@lewoudar
Created November 20, 2022 18:26
Show Gist options
  • Save lewoudar/2033b1bc943457fd8430582054be2317 to your computer and use it in GitHub Desktop.
Save lewoudar/2033b1bc943457fd8430582054be2317 to your computer and use it in GitHub Desktop.
An example of how to schedule periodic emailing in python
import json
import os
import tempfile
from pathlib import Path
import emails
import tweepy
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
def send_mail_with_tweets():
# you need to define the following environment variables :
# SMTP_HOST: smtp server where the email will be sent
# SMTP_PORT: port used by the smtp server
# SMTP_TLS: "true" if we want to use TLS "false" otherwise
# SMTP_USER: optional, user login
# SMTP_PASSWORD: optional, user password
smtp = {
'host': os.getenv('SMTP_HOST'),
'port': os.getenv('SMTP_PORT'),
}
if os.getenv('SMTP_TLS', '').lower() == 'true':
smtp.update({
'ssl': True,
'user': os.getenv('SMTP_USER'),
'password': os.getenv('SMTP_PASSWORD')
})
client = tweepy.Client(os.getenv('BEARER_TOKEN'))
with tempfile.TemporaryDirectory() as tmp_dir:
# you may want to replace the extension ".jl" with another one like
# "txt" because some mail providers can delete files with non-standard extension
path = Path(tmp_dir) / 'tweets.jl'
with path.open('w') as f:
for tweet in tweepy.Paginator(
client.search_recent_tweets,
'("black panther" OR #wakandaforever) '
'(magnificent OR amazing OR excellent OR awesome OR great) -is:retweet',
max_results=100
).flatten():
data = {
'id': tweet.id,
'text': tweet.text,
'twitter_url': f'https://twitter/gandi_net/status/{tweet.id}'
}
f.write(f'{json.dumps(data)}\n')
message = emails.Message(
subject='Wakanda Forever tribute',
text='You will find attached a file with all the tweets paying tribute to Black Panther',
mail_from=('Twitter Bot', 'twitter@bot.com')
)
message.attach(filename=path.name, content_disposition='inline', data=open(path, 'rb'))
# replace the destination here with your email address
response = message.send(to='foo@bar.com', smtp=smtp)
# log information if the email was not sent
if response.status_code != 250:
print('There was an error when trying to send email')
# For a more realistic exemple, you should configure a jobstore to save job info
# in a database, so that if the server crashes and has to restart, the sheduler
# will pick up where it left off
scheduler = BlockingScheduler(timezone='utc')
# we will use the crontab notation to schedule our emailing
# it will be done every morning at 9 a.m
scheduler.add_job(send_mail_with_tweets, CronTrigger.from_crontab('0 9 * * 1'))
# the scheduler starts here and the start call is blocking
scheduler.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment