Skip to content

Instantly share code, notes, and snippets.

@Alexhuszagh
Last active November 27, 2022 15:44
Show Gist options
  • Save Alexhuszagh/4b1730f88809dbb376d21662b17f14be to your computer and use it in GitHub Desktop.
Save Alexhuszagh/4b1730f88809dbb376d21662b17f14be to your computer and use it in GitHub Desktop.
Selenium-based Twitter scraper
#!/usr/bin/env python
'''
selenium_twitter
================
Uses selenium to extract Tweets from Twitter
for a given user. Note that this breaks Twitter's
ToS and you can easily get banned for doing this.
I highly recommend you do not do this while logged in,
or on the same IP address as your main account.
Sample Usage:
./selenium_twitter.py \
--user kardonice \
--output kardonice.csv \
--save-media \
--format csv \
--proxy socks5://127.0.0.1:9150 \
--headless \
--verbose
Requirements:
Python 3.9+
beautifulsoup>=4.9
requests>=2.25
python-dateutil>=2.8
selenium>=4.1
Compatible chromedriver and chrome/chromium versions
Optional Requirements:
undetected_chromedriver>=3.1.3
pysocks>=1.7 (using a socks5 proxy, like over Tor)
youtube_dl>=2021.05.16 (to download videos)
The `undetected_chromedriver` is highly recommended, as it makes
anti-scraping detection much more difficult and is essentially a
drop-in replacement for selenium.
If converting video formats for Tweets, you may also need ffmpeg:
https://ffmpeg.org/download.html
You can download chromedriver here:
https://chromedriver.chromium.org/downloads
NOTE: We currently do not support Twitter spaces. Any support
would be greatly appreciated.
'''
__version__ = '0.0.0-dev'
__author__ = 'Alex Huszagh <ahuszagh@gmail.com>'
__license__ = 'Unlicense (Public Domain)'
import argparse
import csv
import datetime
import dateutil.parser
import itertools
import json
import os
import random
import re
import requests
import subprocess
import sys
import time
import urllib.parse
try:
import undetected_chromedriver as webdriver
except ImportError:
from selenium import webdriver
try:
import youtube_dl
except ImportError:
pass
from bs4 import BeautifulSoup
from selenium.common.exceptions import (
NoSuchElementException,
StaleElementReferenceException,
TimeoutException,
WebDriverException,
)
from selenium.webdriver.common.by import By
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from requests.exceptions import RequestException
from urllib3.exceptions import (
HTTPError,
IncompleteRead,
InvalidChunkLength,
MaxRetryError,
ProtocolError,
ProxyError,
TimeoutError,
)
class TwitterError(Exception):
pass
class StopTwitterError(Exception):
pass
LOAD_EXCEPTIONS = (NoSuchElementException, TimeoutException)
CONNECTION_EXCEPTIONS = (
IncompleteRead,
InvalidChunkLength,
MaxRetryError,
NoSuchElementException,
ProtocolError,
ProxyError,
TimeoutError,
TimeoutException,
TwitterError,
WebDriverException,
)
def print_verbose(message, verbose=True):
if verbose:
print(message)
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
def current_date():
now = datetime.datetime.now()
return now.date()
def parse_date(string):
year, date, day = string.split('-')
return datetime.date(int(year), int(date), int(day))
def format_date(date):
return f'{date.year}-{date.month:02d}-{date.day:02d}'
def format_datetime(date):
return date.strftime('%a %b %d %H:%M:%S %z %Y')
def add_interval(date, interval):
return datetime.date.fromordinal(date.toordinal() + interval)
def subtract_interval(date, interval):
return datetime.date.fromordinal(date.toordinal() - interval)
def parse_joined_date(string):
return datetime.datetime.strptime(string, '%B %Y').date()
def add_query(url, key, value):
parsed = urllib.parse.urlparse(url)
query = dict(urllib.parse.parse_qsl(parsed.query))
query[key] = [value]
query_string = urllib.parse.urlencode(query, doseq=True)
return urllib.parse.urlunparse(parsed._replace(query=query_string))
def parse_interaction_count(string):
lower = string.lower()
if 'b' in lower:
multiplier = 1000000000
lower = lower[:-1]
elif 'm' in lower:
multiplier = 1000000
lower = lower[:-1]
elif 'k' in lower:
multiplier = 1000
lower = lower[:-1]
else:
multiplier = 1
float_str = lower.replace(',', '') or '0'
return int(multiplier * float(float_str))
def random_wait(lower, upper):
'''Wait for a random amount of time to thwart anti-scrape detection.'''
time.sleep(random.uniform(lower, upper))
def retry_scope(callback, count=3, exception=StaleElementReferenceException):
for index in range(count):
try:
return callback()
except exception as err:
error = err
raise error
def get_tweet_id(url):
parsed = urllib.parse.urlparse(url)
return os.path.basename(parsed.path)
def extract_media_data(media_item, tweet_url):
# can only post 1 video at a time, which includes GIFs
# you can post GIFs and photos at the same time, so you
# can have multiple photos and 1 video.
image = extract_photo_data(media_item, tweet_url)
if image is not None:
return image
video = extract_video_data(media_item, tweet_url)
if video is not None:
return video
def extract_photo_data(element, tweet_url):
image = try_find_element(element, By.TAG_NAME, 'img')
if image is None:
return
parsed = urllib.parse.urlparse(image.get_attribute('src'))
query = urllib.parse.parse_qs(parsed.query)
# sometimes the extension is none and is already provided in the path
extension = query.get('format')
path = parsed.path
if extension is not None:
path = f'{path}.{extension[0]}'
url = parsed._replace(path=path, query='')
return {
'type': 'photo',
'media_url': urllib.parse.urlunparse(url._replace(scheme='http')),
'media_url_https': urllib.parse.urlunparse(url),
}
def extract_video_data(element, tweet_url):
# don't search for the video directly, since it might not render
# if that's the case, the `<video>` tag won't appear.
video_selector = '*[data-testid="videoPlayer"]'
has_video = has_element(element, By.CSS_SELECTOR, video_selector)
if not has_video:
return
parsed = urllib.parse.urlparse(tweet_url)
return {
'type': 'video',
'media_url': urllib.parse.urlunparse(parsed._replace(scheme='http')),
'media_url_https': tweet_url,
}
def extract_tweet_data(session, url, user_info, args):
timeout = int(args.timeout)
short_timeout = int(args.short_timeout)
open_in_tab(session, url)
random_wait(0.1, 0.15)
id_str = get_tweet_id(url)
tweet_selector = f'article[data-testid="tweet"] a[href$="{id_str}"]'
text_selector = '*[data-testid="tweetText"]'
time_selector = 'a[role="link"] > time'
retweet_selector = 'a[href$="/retweets"]'
quote_selector = 'a[href$="/retweets/with_comments"]'
favorite_selector = 'a[href$="/likes"]'
transitive_text_selector = '*[data-testid="app-text-transition-container"]'
sources_url = 'https://help.twitter.com/using-twitter/how-to-tweet#source-labels'
source_selector = f'a[href="{sources_url}"][rel="noopener noreferrer nofollow"]'
media_selector = '*[data-testid="tweetPhoto"]'
data = {}
data['user'] = user_info.copy()
wait = WebDriverWait(session, timeout)
short_wait = WebDriverWait(session, short_timeout)
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, tweet_selector)))
tweet = session.find_element(By.CSS_SELECTOR, tweet_selector)
while tweet is not None and tweet.tag_name != 'article':
tweet = tweet.find_element(by=By.XPATH, value='..')
if tweet is None:
raise TwitterError(f'Unable to find Tweet body for url "{url}".')
try:
short_wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, text_selector)))
text_elements = tweet.find_elements(By.CSS_SELECTOR, text_selector)
except LOAD_EXCEPTIONS:
text_elements = []
# empty tweets can have a missing element
if text_elements:
text = text_elements[0]
data['text'] = text.text
else:
data['text'] = ''
data['id_str'] = id_str
data['id'] = int(id_str)
time = tweet.find_element(By.CSS_SELECTOR, time_selector)
timestamp = time.get_attribute('datetime')
date = dateutil.parser.parse(timestamp)
data['created_at'] = format_datetime(date)
# these all may be missing (they're nullable)
try:
retweets = session.find_element(By.CSS_SELECTOR, retweet_selector)
retweets = retweets.find_element(By.CSS_SELECTOR, transitive_text_selector)
data['retweet_count'] = parse_interaction_count(retweets.text)
except NoSuchElementException:
pass
try:
quotes = session.find_element(By.CSS_SELECTOR, quote_selector)
quotes = quotes.find_element(By.CSS_SELECTOR, transitive_text_selector)
data['quote_count'] = parse_interaction_count(quotes.text)
except NoSuchElementException:
pass
try:
favorites = session.find_element(By.CSS_SELECTOR, favorite_selector)
favorites = favorites.find_element(By.CSS_SELECTOR, transitive_text_selector)
data['favorite_count'] = parse_interaction_count(favorites.text)
except NoSuchElementException:
pass
data['is_quote_status'] = len(text_elements) > 1
try:
source = session.find_element(By.CSS_SELECTOR, source_selector)
html = source.get_attribute('outerHTML')
soup = BeautifulSoup(html, 'html.parser')
for attr in ['target', 'class', 'role']:
soup.a.attrs.pop(attr)
data['source'] = str(soup.a)
except NoSuchElementException:
pass
try:
media = tweet.find_elements(By.CSS_SELECTOR, media_selector)
data['entities'] = {}
data['entities']['media'] = []
for media_item in media:
item = extract_media_data(media_item, url)
if item is not None:
data['entities']['media'].append(item)
except NoSuchElementException:
pass
close_tab(session)
return data
def open_in_tab(session, url):
# this requires `--disable-popup-blocking` to be set
session.execute_script('window.open();')
handle = session.window_handles[-1]
session.switch_to.window(handle)
session.get(url)
def close_tab(session):
session.execute_script('window.close();')
handle = session.window_handles[-1]
session.switch_to.window(handle)
def parse_memo(location, args):
memo = set()
if os.path.exists(location.file_path):
globals()[f'read_memo_{args.format}'](location, memo)
return memo
def read_memo_csv(location, memo):
with open(location.file_path, newline='') as file:
reader = csv.DictReader(file, dialect='excel-tab')
for row in reader:
memo.add(row['id_str'])
def read_memo_json(location, memo):
with open(location.file_path) as file:
for line in file:
data = json.loads(line)
memo.add(data['id_str'])
def check_cycle_proxy(proxies, session, force, args):
proxy = None
if hasattr(proxies, '__next__') and session is None:
proxy = next(proxies)
session = Session(get_session(proxy, args))
elif session is None:
proxy = proxies
session = Session(get_session(proxy, args))
elif hasattr(proxies, '__next__'):
proxy = next(proxies)
print_verbose(f'Cycling proxy to {proxy}.')
session.close()
session._session = get_session(proxy, args)
elif session is None:
proxy = proxies
session = Session(get_session(proxy, args))
elif force:
proxy = proxies
session.close()
session._session = get_session(proxy, args)
return (session, proxy)
def get_options(proxy, args):
options = webdriver.ChromeOptions()
if proxy:
print_verbose(f'Connecting to proxy at {proxy}.', args.verbose)
options.add_argument(f'--proxy-server={proxy}')
# Run in a headless session (useful when we know this logic works).
if getattr(args, 'headless', None):
options.add_argument('--headless')
# ensure we disable popup blocking for our tabs
options.add_argument('--disable-popup-blocking')
return options
def get_session(proxy, args):
'''Get a pre-configured Selenium driver session.'''
options = get_options(proxy, args)
# If the page is slow to load or never does, use a custom strategy
capabilities = DesiredCapabilities().CHROME
capabilities['pageLoadStrategy'] = getattr(args, 'page_load_strategy', 'normal')
# ensure we're using the correct major version.
# this only works with the undetected chromedriver.
kwds = {}
if 'undetected_chromedriver' in sys.modules:
output = subprocess.check_output(['chromedriver', '--version']).decode('utf-8')
version = int(re.match(r'^ChromeDriver (\d+)\.\d+', output).group(1))
kwds['version_main'] = version
print_verbose('Getting Chrome browser session', args.verbose)
return webdriver.Chrome(
options=options,
desired_capabilities=capabilities,
**kwds,
)
def try_find_element(parent, by, selector):
try:
return parent.find_element(by, selector)
except NoSuchElementException:
pass
def has_element(parent, by, selector):
return try_find_element(parent, by, selector) is not None
def get_user_info(session, screen_name):
script_selector = 'script[data-testid="UserProfileSchema-test"]'
verified_selector = 'svg[aria-label="Verified account"]'
script = session.find_element(By.CSS_SELECTOR, script_selector)
data = script.get_attribute('innerText')
user = json.loads(data)
author = user['author']
profile_image_url_https = urllib.parse.urlparse(author['image']['contentUrl'])
profile_image_url = profile_image_url_https._replace(scheme='http')
result = {
'id': int(author['identifier']),
'id_str': author['identifier'],
'description': author['description'],
'name': author['givenName'],
'screen_name': screen_name,
'created_at': user['dateCreated'],
'profile_image_url': urllib.parse.urlunparse(profile_image_url),
'profile_image_url_https': urllib.parse.urlunparse(profile_image_url_https),
}
for item in author['interactionStatistic']:
if item['name'] == 'Friends':
result['friends_count'] = item['userInteractionCount']
elif item['name'] == 'Follows':
result['followers_count'] = item['userInteractionCount']
is_verified = has_element(session, By.CSS_SELECTOR, verified_selector)
result['is_verified'] = is_verified
return result
def get_tweets(session, query, args):
tweet_selector = 'article[data-testid="tweet"]'
no_tweets_selector = '*[data-testid="empty_state_header_text"]'
short_timeout = int(args.short_timeout)
short_wait = WebDriverWait(session, short_timeout)
tweets = None
try:
short_wait.until(EC.any_of(
EC.presence_of_element_located((By.CSS_SELECTOR, tweet_selector)),
EC.presence_of_element_located((By.CSS_SELECTOR, no_tweets_selector)),
))
if has_element(session, By.CSS_SELECTOR, tweet_selector):
tweets = session.find_elements(By.CSS_SELECTOR, tweet_selector)
elif has_element(session, By.CSS_SELECTOR, no_tweets_selector):
tweets = []
except NoSuchElementException:
pass
except TimeoutException:
print(f'Unable to fetch Tweets within timeout for query "{query}", maybe increase the timeout?')
return tweets
def process_tweets(session, tweets, user_info, memo, args):
link_selector = 'a[role="link"] > time'
processed = 0
for tweet in tweets:
# find the link via the date element
link_time = tweet.find_element(By.CSS_SELECTOR, link_selector)
link = link_time.find_element(by=By.XPATH, value='..')
tweet_url = link.get_attribute('href')
id_str = get_tweet_id(tweet_url)
if id_str in memo:
print_verbose(f'Got duplicate Tweet with ID "{id_str}".', args.verbose)
continue
# now, process all the tweet data
print_verbose(f'Processing tweet at URL "{tweet_url}".', args.verbose)
processed += 1
memo.add(id_str)
cb = lambda: extract_tweet_data(session, tweet_url, user_info, args)
yield retry_scope(cb)
if processed == 0:
# ensure we mark there were no tweets processed
tweets.clear()
def get_statuses_impl(location, memo, session, until, args):
interval = int(args.interval)
session.get(f'https://twitter.com/{args.user}')
# first, check if our page actually loaded
try:
timeline_selector = '*[aria-label="Home timeline"]'
home_timeline = session.find_element(By.CSS_SELECTOR, timeline_selector)
if 'Something went wrong. Try reloading.' in home_timeline.text:
raise TwitterError('Unable to load tweets, Twitter cannot be accessed.')
except NoSuchElementException as error:
raise TwitterError('Unable to get home timeline') from error
# now, get the start: when the account was created
# change this into YYYY-MM-DD format.
user_info = get_user_info(session, args.user)
joined = dateutil.parser.parse(user_info['created_at']).date()
# now, need we need to process tweets iteratively,
# and log the current date range for each
search_selector = 'input[data-testid="SearchBox_Search_Input"]'
wait = WebDriverWait(session, int(args.timeout))
while until > joined:
# generate our search query
since = max(subtract_interval(until, interval), joined)
query = f'from:{args.user} until:{format_date(until)} since:{format_date(since)}'
until = since
print_verbose(f'Running search query of "{query}".', args.verbose)
# enter our search terms: must clear field. using `clear` no longer
# works, so we must send keys manually
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, search_selector)))
element = session.find_element(By.CSS_SELECTOR, search_selector)
element.send_keys(Keys.CONTROL + 'a')
element.send_keys(Keys.DELETE)
element.send_keys(query)
element.send_keys(Keys.RETURN)
# now, shift to the latest tab. the easiest way for this
# is to get the URL, and add f=live for the parameters
url = session.current_url
live_url = add_query(url, 'f', 'live')
print_verbose(f'Going to live url "{live_url}".', args.verbose)
session.get(live_url)
# now, need to iterate over all tweets
# twitter dynamically loads tweets so we
# need to scroll into view and store which
# tweets we've processed.
got_tweets = True
while got_tweets:
# wrap this in a short little wrapper so if we fail, then we can reload
for _ in range(3):
tweets = get_tweets(session, query, args)
if tweets:
break
elif tweets is None:
continue
got_tweets = False
if tweets is not None:
yield from process_tweets(session, tweets, user_info, memo, args)
got_tweets = len(tweets) != 0
# now, we need to scroll towards the end
if got_tweets:
session.execute_script('arguments[0].scrollIntoView();', tweets[-1])
time.sleep(0.4)
# now need to yield since it's a safe time to cycle proxies
yield True
# NOTE: cannot use `StopIteration` since those raise a
# runtime error when used within a generator.
raise StopTwitterError('Completed all Tweets through account creation date.')
def get_statuses(location, args):
memo = parse_memo(location, args)
processed_total = 0
consecutive_tries = 0
until = parse_date(args.until)
cycle_count = int(args.cycle_count)
restart_timeout = int(args.restart_timeout)
tweet = None
session = None
force = False
proxy = None
proxies = getattr(args, 'proxy', None)
if isinstance(proxies, list) and len(proxies) == 1:
proxies = proxies[0]
elif isinstance(proxies, list) and len(proxies) > 1:
proxies = itertools.cycle(proxies)
while True:
try:
session, proxy = check_cycle_proxy(proxies, session, force, args)
force = False
processed_total = 0
for tweet in get_statuses_impl(location, memo, session, until, args):
is_bool = isinstance(tweet, bool)
if is_bool and processed_total >= cycle_count:
# will cycle proxy on the next loop
break
elif not is_bool:
parsed_date = dateutil.parser.parse(tweet['created_at']).date()
until = add_interval(parsed_date, 1)
yield (tweet, proxy)
processed_total += 1
consecutive_tries = 0
except CONNECTION_EXCEPTIONS:
consecutive_tries += 1
print_verbose(f'Got connection error, retrying, done {consecutive_tries} consecutive tries.')
if consecutive_tries > 2 and args.restart_on_failure:
eprint(f'\033[31mError:\033[0m Restarting on failure, sleeping for {restart_timeout} seconds.')
time.sleep(restart_timeout)
consecutive_tries = 0
force = True
elif consecutive_tries > 2:
raise
elif tweet is not None:
pass
force = True
except StopTwitterError:
print_verbose('Completed all Tweets.', args.verbose)
break
except (HTTPError, WebDriverException):
if not args.restart_on_failure:
raise
eprint(f'\033[31mError:\033[0m Restarting on failure, sleeping for {restart_timeout} seconds.')
time.sleep(restart_timeout)
consecutive_tries = 0
force = True
SIMPLE_USER_FIELDS = [
'created_at',
'description',
'entities',
'followers_count',
'friends_count',
'id',
'id_str',
'name',
'screen_name',
'url',
'verified',
]
SIMPLE_STATUS_FIELDS = [
'created_at',
'favorite_count',
'id',
'id_str',
'is_quote_status',
'quote_count',
'retweet_count',
'retweeted',
'source',
'text',
# NOTE: we currently do not support finding the reply IDs or
# quote IDs, since it's not very well marked in the UI.
]
def extract_fields(item, fields):
if fields is None:
return item
return {k: item.get(k) for k in fields}
def write_items(location, iterable, args):
restart_timeout = int(args.restart_timeout)
os.makedirs(location.parent, exist_ok=True)
if args.save_media:
os.makedirs(location.directory_path, exist_ok=True)
consecutive_tries = 0
while True:
try:
globals()[f'write_{args.format}'](location, iterable, args)
return
except RequestException:
consecutive_tries += 1
if consecutive_tries > 2 and args.restart_on_failure:
eprint(f'\033[31mError:\033[0m Restarting on failure, sleeping for {restart_timeout} seconds.')
time.sleep(restart_timeout)
consecutive_tries = 0
elif consecutive_tries > 2:
raise
def write_csv(location, iterable, args):
with open(location.file_path, 'a', newline='') as file:
writer = None
fields = args.fields or SIMPLE_STATUS_FIELDS
for index, (item, proxy) in enumerate(iterable):
if writer is None:
writer = csv.DictWriter(file, fieldnames=fields, dialect='excel-tab')
writer.writeheader()
# save media prior to item, so we ensure it gets written
if args.save_media:
save_media_urls(location, item, proxy, args)
print_verbose(f'Writing status {item["id_str"]}', args.verbose)
data = extract_fields(item, fields)
writer.writerow(data)
if index % 10 == 0:
file.flush()
def write_json(location, iterable, args):
# this writes it as a series of JSON objects, to avoid failing to write to disk
with open(location.file_path, 'a') as file:
fields = args.fields
for index, (item, proxy) in enumerate(iterable):
# save media prior to item, so we ensure it gets written
if args.save_media:
save_media_urls(location, item, proxy, args)
print_verbose(f'Writing status {item["id_str"]}', args.verbose)
data = extract_fields(item, fields)
file.write(json.dumps(data) + '\n')
if index % 10 == 0:
file.flush()
def save_media_urls(location, item, proxy, args):
media = item.get('entities', {}).get('media', [])
media += item.get('extended_entities', {}).get('media', [])
for media_item in media:
save_media_item(location, item, media_item, proxy, args)
def save_media_item(location, item, media_item, proxy, args):
url = media_item.get('media_url_https')
filetype = media_item.get('type')
if url is not None:
globals()[f'save_media_{filetype}'](location, url, item, proxy, args)
def save_media_photo(location, url, item, proxy, args):
parsed = urllib.parse.urlparse(url)
filename = os.path.basename(parsed.path)
unique_filename = f'{item["id_str"]}-{filename}'
print_verbose(f'Saving photo at url "{url}" with unique ID {unique_filename}.')
kwds = {}
if proxy:
kwds['proxies'] = {
'http': proxy,
'https': proxy,
}
response = requests.get(url, **kwds)
if not response.ok:
eprint(f'\033[31mError:\033[0m Unable to save media attachment at url "{url}".')
path = os.path.join(location.directory_path, unique_filename)
with open(path, 'wb') as file:
file.write(response.content)
def save_media_video(location, url, item, proxy, args):
if 'youtube_dl' not in sys.modules:
eprint('\033[1;33mWarning:\033[0m Unable to save video: youtube-dl is not installed.')
return
# videos can either be `mp4` or `m3u8_native` (a plain-text playlist
# of native media files), but we always convert to an `mp4`.
unique_filename = f'{item["id_str"]}.mp4'
path = os.path.join(location.directory_path, unique_filename)
print_verbose(f'Saving video at url "{url}" with unique ID {unique_filename}.')
args = ['-f', 'best', '--format', 'mp4', '--output', path, '--proxy', proxy, url]
try:
youtube_dl.main(args)
except SystemExit as exit:
# youtube-dl calls `sys.exit`, which we need to catch.
# re-raise if we have an unsuccessful exit
if exit.code != 0:
raise exit
class Session:
'''Shallow wrapper so we can modify sessions in-place.'''
def __init__(self, session):
self._session = session
def __getattr__(self, attr):
if attr in self.__dict__:
return getattr(self, attr)
return getattr(self._session, attr)
class OutputLocation:
_slots_ = ('parent', 'filename', 'extension')
def __init__(self, path, file_format):
realpath = os.path.realpath(path)
self.parent = os.path.dirname(realpath)
basename = os.path.basename(realpath)
self.filename, self.extension = os.path.splitext(basename)
if not self.extension:
self.extension = f'.{file_format}'
@property
def file_path(self):
return os.path.join(self.parent, f'{self.filename}{self.extension}')
@property
def directory_path(self):
return os.path.join(self.parent, self.filename)
def main():
'''Parse the command-line arguments.'''
today = current_date()
tomorrow = add_interval(today, 1)
parser = argparse.ArgumentParser(description='Twitter Selenium exporter parameters.')
parser.add_argument(
'-u',
'--user',
help='Screen name of user to get Tweets from.',
required=True,
)
parser.add_argument(
'-o',
'--output',
help='Output file name, the extension will be added if not provided.',
)
parser.add_argument(
'-V',
'--version',
action='version',
version=f'%(prog)s {__version__}'
)
parser.add_argument(
'-sm',
'--save-media',
action='store_true',
help='Save media attachments. The directory name defaults to the filename.',
)
parser.add_argument(
'-f',
'--fields',
help='Fields to extract from each item. Leave empty for all',
nargs='*',
)
parser.add_argument(
'--format',
help='Export format.',
default='json',
choices=['json', 'csv'],
)
parser.add_argument(
'-p',
'--proxy',
help='Proxy server to connect to Twitter over.',
nargs='*',
)
parser.add_argument(
'-v',
'--verbose',
action='store_true',
help='Print verbose debugging information.',
)
parser.add_argument(
'--headless',
action='store_true',
help='Run in headless (no UI) mode.',
)
parser.add_argument(
'--until',
help='The date in YYYY-MM-DD format.',
default=format_date(tomorrow),
)
parser.add_argument(
'--interval',
help='The number of days to generate the since/until date ranges.',
default='7',
)
parser.add_argument(
'--timeout',
help='Timeout (in seconds) for an element to load after page load.',
default='240',
)
parser.add_argument(
'--short-timeout',
help='Short timeout (in seconds) waiting elements to dynamically load on a loaded site.',
default='10',
)
parser.add_argument(
'--cycle-count',
help='Number of Tweets to process before cycling proxies.',
default='240',
)
parser.add_argument(
'--restart-on-failure',
action='store_true',
help='Restart automatically, infinitely, if the command fails.',
)
parser.add_argument(
'--restart-timeout',
help='Timeout, in seconds, to sleep before retrying on failure.',
default='600',
)
args = parser.parse_args()
if not args.proxy:
eprint('\033[1;33mWarning:\033[0m It is highly recommended to use a proxy server.')
output = args.output
if output is None:
output = f'{args.user}_statuses.{args.format}'
location = OutputLocation(output, args.format)
iterable = get_statuses(location, args)
write_items(location, iterable, args)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment