Last active
February 15, 2024 22:40
-
-
Save Xetera/425db800c3f5109506864a9acf5a7a57 to your computer and use it in GitHub Desktop.
A cross-seeding script to go through radarr's import history and hardlink movies downloaded from usenet with their original titles
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
import requests | |
import os | |
import pathlib | |
RADARR_TOKEN = os.environ.get("RADARR_TOKEN") | |
# directory where hardlinks will be created for renaming purposes | |
HARDLINK_TARGET = os.environ.get("HARDLINK_TARGET") | |
# DRY_RUN=true to test without touching the filesystem | |
DRY_RUN = os.environ.get("DRY_RUN", "false").lower() == "true" | |
USENET_DIRECTORY = os.environ.get("USENET_DIRECTORY", None) | |
TORRENT_DIRECTORY = os.environ.get("TORRENT_DIRECTORY", None) | |
# folder mappings inside the cross-seed container as keys and the actual folder on the host as values | |
DOCKER_MAPPINGS = { | |
# must include trailing slash | |
"/data/": "/mnt/storage/" | |
} | |
if USENET_DIRECTORY is None: | |
USENET_DIRECTORY = "/data/usenet" | |
print("USENET_DIRECTORY is not set, using default: /data/usenet") | |
USENET_PATHS = [ | |
f"{USENET_DIRECTORY}/movies/", | |
f"{USENET_DIRECTORY}/tv/", | |
f"{USENET_DIRECTORY}/", | |
] | |
radarr_url = ( | |
f'{os.environ.get("RADARR_URL", "http://localhost:7878").rstrip("/")}/api/v3' | |
) | |
def unmap_path(path: str) -> str: | |
for key, value in DOCKER_MAPPINGS.items(): | |
path = path.replace(key, value) | |
return path | |
def get_total_records(api_key: str) -> int: | |
response = requests.get( | |
f"{radarr_url}/history", | |
headers={"X-Api-Key": api_key}, | |
params={ | |
"page": 0, | |
"pageSize": 1, | |
"sortKey": "date", | |
"sortDir": "desc", | |
}, | |
timeout=10, | |
).json() | |
return response["totalRecords"] | |
def process_page( | |
api_key: str, | |
page: int, | |
grab_count: int, | |
movies: dict[str, (pathlib.Path, pathlib.Path)] | |
): | |
response = requests.get( | |
f"{radarr_url}/history", | |
headers={"X-Api-Key": api_key}, | |
params={ | |
"page": page, | |
"pageSize": grab_count, | |
"sortKey": "date", | |
"sortDirection": "ascending", | |
# "eventType": 3, | |
}, | |
timeout=10, | |
) | |
with open("history.json", "w", -1, "utf8") as file: | |
file.write(response.text) | |
history = response.json() | |
hardink_folder = pathlib.Path(HARDLINK_TARGET) | |
if not DRY_RUN: | |
hardlink_folder.mkdir(exist_ok=True) | |
for record in history["records"]: | |
if record["eventType"] == "movieFileDeleted": | |
try: | |
del movies[record["movieId"]] | |
except: | |
pass | |
if record["eventType"] != "downloadFolderImported": | |
continue | |
data = record["data"] | |
originalPath = pathlib.Path(unmap_path(data["droppedPath"])) | |
if not any(str(originalPath).startswith(path) for path in USENET_PATHS): | |
# only usenet paths supported | |
# print(f"[Debug] Path is not a usenet directory: {originalPath.name}") | |
continue | |
import_path = unmap_path(data["importedPath"]) | |
data_path = pathlib.Path(import_path) | |
if not data_path.exists(): | |
print(f"[Warning] Data path does not exist: {data_path}") | |
continue | |
direct_path_fragment = str(originalPath) | |
for path in USENET_PATHS: | |
direct_path_fragment = direct_path_fragment.lstrip(path) | |
direct_path = pathlib.Path(hardlink_folder, direct_path_fragment) | |
if TORRENT_DIRECTORY is not None: | |
torrent_dirs = [ | |
pathlib.Path(TORRENT_DIRECTORY, direct_path.parent.name), | |
pathlib.Path(TORRENT_DIRECTORY, direct_path.name), | |
] | |
if any(directory.exists() for directory in torrent_dirs): | |
print( | |
f"[Warning] The imported path is also in the torrent directory: {direct_path}" | |
) | |
continue | |
if not DRY_RUN and not direct_path.exists(): | |
# assuming sabnzbd always creates a parent folder for the directory | |
direct_path.parent.mkdir(exist_ok=True, parents=True) | |
movies[record["movieId"]] = (direct_path, data_path) | |
print(f"[Info] Found previously imported: {direct_path.name}") | |
def movies(api_key, grab_count=100): | |
total_records = get_total_records(api_key) | |
movies: dict[str, (pathlib.Path, pathlib.Path)] = {} | |
for page in range(0, total_records // grab_count): | |
print(f"[Debug] Processing page: {page}") | |
process_page(api_key, page, grab_count, movies) | |
print("[Info] Hardlinking movies...") | |
for direct_path, data_path in movies.values(): | |
if DRY_RUN: | |
return | |
try: | |
direct_path.hardlink_to(data_path) | |
print(f"[Info] Hardlinked: {direct_path.name}") | |
except FileExistsError: | |
print(f"[Info] Hardlink already exists for: {direct_path.name}") | |
if __name__ == "__main__": | |
print("[Debug] Starting xseed-usenet.py...") | |
movies(RADARR_TOKEN) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment