Last active
March 14, 2022 08:09
-
-
Save srhinos/03edc73cbcf0bdd70f5c6e78457f7546 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import functools | |
import logging | |
import os | |
from concurrent.futures import ThreadPoolExecutor | |
import youtube_dl | |
from .digital_ocean import DropletManager | |
allow_requests = True | |
log = logging.getLogger(__name__) | |
ytdl_format_options = { | |
"format": "bestaudio/best", | |
"outtmpl": "%(extractor)s-%(id)s-%(title)s.%(ext)s", | |
"restrictfilenames": True, | |
"noplaylist": True, | |
"nocheckcertificate": True, | |
"ignoreerrors": False, | |
"logtostderr": False, | |
"quiet": True, | |
"no_warnings": True, | |
"default_search": "auto", | |
"source_address": "0.0.0.0", | |
"forceipv4": True, | |
"proxy": None, | |
"usenetrc": True, | |
} | |
# Fuck your useless bugreports message that gets two link embeds and confuses users | |
youtube_dl.utils.bug_reports_message = lambda: "" | |
""" | |
Alright, here's the problem. To catch youtube-dl errors for their useful information, I have to | |
catch the exceptions with `ignoreerrors` off. To not break when ytdl hits a dumb video | |
(rental videos, etc), I have to have `ignoreerrors` on. I can change these whenever, but with async | |
that's bad. So I need multiple ytdl objects. | |
""" | |
class Downloader: | |
def __init__(self, download_folder=None): | |
self.thread_pool = ThreadPoolExecutor(max_workers=2) | |
self.droplet_manager = DropletManager(self.thread_pool) | |
self.download_folder = download_folder | |
self.last_safe_ytdl = None | |
self.initialized = False | |
self.initializing = False | |
@property | |
def ytdl(self): | |
return self.last_safe_ytdl | |
async def initialize(self): | |
self.initializing = True | |
await self.droplet_manager.initialize_manager() | |
self.last_safe_ytdl, _ = await self.build_ytdl_downloader("safe") | |
self.initialized = True | |
async def build_ytdl_downloader(self, downloader_type): | |
current_droplet_id = await self.droplet_manager.get_random_droplet() | |
ytdl_downloader = youtube_dl.YoutubeDL( | |
await self.droplet_manager.affix_ip(ytdl_format_options, current_droplet_id) | |
) | |
if self.download_folder: | |
otmpl = ytdl_downloader.params["outtmpl"] | |
ytdl_downloader.params["outtmpl"] = os.path.join( | |
self.download_folder, otmpl | |
) | |
if downloader_type == "safe": | |
ytdl_downloader.params["ignoreerrors"] = True | |
self.last_safe_ytdl = ytdl_downloader | |
return ytdl_downloader, current_droplet_id | |
async def build_clean_ytdl_downloader(self): | |
ytdl_downloader = youtube_dl.YoutubeDL(ytdl_format_options) | |
if self.download_folder: | |
otmpl = ytdl_downloader.params["outtmpl"] | |
ytdl_downloader.params["outtmpl"] = os.path.join( | |
self.download_folder, otmpl | |
) | |
return ytdl_downloader | |
async def extract_info( | |
self, | |
loop, | |
*args, | |
on_error=None, | |
retry_on_error=False, | |
build_clean=False, | |
**kwargs | |
): | |
""" | |
Runs ytdl.extract_info within the threadpool. Returns a future that will fire when it's done. | |
If `on_error` is passed and an exception is raised, the exception will be caught and passed to | |
on_error as an argument. | |
""" | |
while True: | |
if not self.initialized: | |
if not self.initializing: | |
await self.initialize() | |
else: | |
await asyncio.sleep(5) | |
else: | |
break | |
if build_clean: | |
yt_downloader = await self.build_clean_ytdl_downloader() | |
else: | |
yt_downloader, current_droplet_id = await self.build_ytdl_downloader( | |
"unsafe" | |
) | |
droplet_info = await self.droplet_manager.get_droplet_info( | |
current_droplet_id | |
) | |
log.info( | |
'downloader built on region {} using IP "{}"'.format( | |
droplet_info.get("region"), droplet_info.get("ip_address"), | |
) | |
) | |
def ytdl_wrapper(yt_dloader, *args, **kwargs): | |
try: | |
return_stuff = yt_dloader.extract_info(*args, **kwargs) | |
print(".........") | |
print(f"{return_stuff}"[:25]) | |
print(".........") | |
return return_stuff | |
except Exception as e: | |
print("============") | |
print(e) | |
print("============") | |
e.is_error = True | |
if not hasattr(e, "msg"): | |
e.msg = str(e) | |
return e | |
if callable(on_error): | |
try: | |
returnable = await loop.run_in_executor( | |
self.thread_pool, | |
functools.partial(ytdl_wrapper, yt_downloader, *args, **kwargs), | |
) | |
if hasattr(returnable, "is_error"): | |
print(". . .") | |
print(returnable) | |
print(dir(returnable)) | |
print(". . .") | |
if hasattr(returnable, "msg") and ("Unable to extract" in returnable.msg or "429" in returnable.msg or "403" in returnable.msg or "Errno 111" in returnable.msg): | |
log.warning( | |
"Bullshit blockage error in downloader, attempting to fetch new ip..." | |
) | |
await self.droplet_manager.delete_droplet(current_droplet_id) | |
returnable = await self.extract_info( | |
loop, | |
*args, | |
on_error=on_error, | |
retry_on_error=retry_on_error, | |
build_clean=build_clean, | |
**kwargs | |
) | |
else: | |
raise returnable | |
return returnable | |
except Exception as e: | |
if hasattr(e, "msg") and ( | |
"Unable to extract" in e.msg or "429" in e.msg or "403" in e.msg or "Errno 111" in e.msg | |
): | |
log.warning( | |
"Bullshit blockage error in downloader, attempting to fetch new ip..." | |
) | |
await self.droplet_manager.delete_droplet(current_droplet_id) | |
return await self.extract_info( | |
loop, | |
*args, | |
on_error=on_error, | |
retry_on_error=retry_on_error, | |
build_clean=build_clean, | |
**kwargs | |
) | |
# (youtube_dl.utils.ExtractorError, youtube_dl.utils.DownloadError) | |
# I hope I don't have to deal with ContentTooShortError's | |
if asyncio.iscoroutinefunction(on_error): | |
asyncio.ensure_future(on_error(e), loop=loop) | |
elif asyncio.iscoroutine(on_error): | |
asyncio.ensure_future(on_error, loop=loop) | |
else: | |
loop.call_soon_threadsafe(on_error, e) | |
if retry_on_error: | |
return await self.safe_extract_info(loop, *args, **kwargs) | |
else: | |
try: | |
returnable = await loop.run_in_executor( | |
self.thread_pool, | |
functools.partial(ytdl_wrapper, yt_downloader, *args, **kwargs), | |
) | |
if hasattr(returnable, "is_error"): | |
print(". . .") | |
print(returnable) | |
print(dir(returnable)) | |
print(". . .") | |
if hasattr(returnable, "msg") and ("Unable to extract" in returnable.msg or "429" in returnable.msg or "403" in returnable.msg or "Errno 111" in returnable.msg): | |
log.warning( | |
"Bullshit blockage error in downloader, attempting to fetch new ip..." | |
) | |
await self.droplet_manager.delete_droplet(current_droplet_id) | |
returnable = await self.extract_info( | |
loop, | |
*args, | |
on_error=on_error, | |
retry_on_error=retry_on_error, | |
build_clean=build_clean, | |
**kwargs | |
) | |
else: | |
raise returnable | |
return returnable | |
except Exception as e: | |
if hasattr(e, "msg") and ( | |
"Unable to extract" in e.msg or "429" in e.msg or "403" in e.msg or "Errno 111" in e.msg | |
): | |
log.warning( | |
"Bullshit blockage error in downloader, attempting to fetch new ip..." | |
) | |
await self.droplet_manager.delete_droplet(current_droplet_id) | |
return await self.extract_info( | |
loop, | |
*args, | |
on_error=on_error, | |
retry_on_error=retry_on_error, | |
build_clean=build_clean, | |
**kwargs | |
) | |
async def safe_extract_info(self, loop, *args, **kwargs): | |
def ytdl_wrapper(yt_dloader, *args, **kwargs): | |
try: | |
return_stuff = yt_dloader.extract_info(*args, **kwargs) | |
print(".........") | |
print(f"{return_stuff}"[:25]) | |
print(".........") | |
return return_stuff | |
except Exception as e: | |
print("============") | |
print(e) | |
print("============") | |
e.is_error = True | |
if not hasattr(e, "msg"): | |
e.msg = str(e) | |
return e | |
yt_downloader, current_droplet_id = await self.build_ytdl_downloader("safe") | |
returnable = await loop.run_in_executor( | |
self.thread_pool, | |
functools.partial(ytdl_wrapper, yt_downloader, *args, **kwargs), | |
) | |
if hasattr(returnable, "is_error"): | |
# print(". . .") | |
# print(returnable) | |
# print(". . .") | |
if hasattr(returnable, "msg") and ("Unable to extract" in returnable.msg or "429" in returnable.msg or "403" in returnable.msg or "Errno 111" in returnable.msg): | |
log.warning( | |
"Bullshit blockage error in downloader, attempting to fetch new ip..." | |
) | |
await self.droplet_manager.delete_droplet(current_droplet_id) | |
returnable = await self.safe_extract_info(loop, *args, **kwargs) | |
else: | |
raise returnable | |
return returnable |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment