Last active
July 28, 2024 03:57
-
-
Save bdmorin/df66bc16a40d8f15479fd711e7142633 to your computer and use it in GitHub Desktop.
torrentgalaxy rss manager
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import sqlite3 | |
import click | |
import json | |
import os | |
def remove_periods(text): | |
return re.sub(r'\.+', ' ', text).strip() | |
def lowercase_sanitize(value): | |
return value.lower() if value else None | |
def sanitize_fields(parsed_data, fields_to_sanitize): | |
for field in fields_to_sanitize: | |
if field in parsed_data: | |
parsed_data[field] = lowercase_sanitize(parsed_data[field]) | |
return parsed_data | |
def parse_title(title): | |
result = {} | |
# Check for REPACK | |
repack = False | |
if 'REPACK' in title.upper(): | |
repack = True | |
title = re.sub(r'REPACK', '', title, flags=re.IGNORECASE) | |
# Remove file extensions if present | |
title = re.sub(r'\.(mkv|mp4)$', '', title) | |
# Match seriesname and seasonepisode | |
match = re.match(r'(.*?)(S\d{2,3}E\d{2,3})', title, re.IGNORECASE) | |
if not match: | |
return None # Unable to parse | |
result['seriesname'] = remove_periods(match.group(1).strip()) | |
result['seasonepisode'] = match.group(2).lower() | |
# Match episodename | |
episodename_match = re.search(r'S\d{2,3}E\d{2,3}(.*?)([\s|.])(?=\d{3,4}p)', title, re.IGNORECASE) | |
if episodename_match: | |
episodename = episodename_match.group(1).strip() | |
if re.search(r'[a-zA-Z]', episodename): # Check if there are any alphabetic characters | |
result['episodename'] = remove_periods(episodename) | |
# Match resolution | |
resolution_match = re.search(r'[.|\s]+(\d{3,4}p)[.|\s]+', title, re.IGNORECASE) | |
if resolution_match: | |
result['resolution'] = resolution_match.group(1).lower() | |
else: | |
return None # Resolution is required | |
# Match encoding | |
encoding_match = re.search(r'((?:x|h)[\s.]?26[45])', title, re.IGNORECASE) | |
if encoding_match: | |
result['encoding'] = encoding_match.group(1).replace(' ', '').replace('.', '') | |
else: | |
return None # Encoding is required | |
# Match network (optional) | |
network_match = re.search(r'[\s|.]{1}(buffer|max|amzn|hulu|dsnp|atvp|nf|ITVX|buffer)[\s|.]{1}', title, re.IGNORECASE) | |
if network_match: | |
result['network'] = network_match.group(1).upper() | |
# Match releasegroup (optional) | |
releasegroup_match = re.search(r'(?<=\-)([^-]*)$', title) | |
if releasegroup_match: | |
result['releasegroup'] = remove_periods(releasegroup_match.group(1)) | |
# Apply sanitization to specific fields | |
fields_to_sanitize = ['encoding', 'resolution', 'network'] | |
result = sanitize_fields(result, fields_to_sanitize) | |
result['repack'] = repack | |
return result | |
def regexp(expr, item): | |
reg = re.compile(expr) | |
return reg.search(item) is not None | |
@click.command() | |
@click.option('--db-path', default='tgx.sqlite3', help='Path to the SQLite database') | |
@click.option('--resolution', default='1080p', help='Resolution to filter (e.g., 1080p, 720p)') | |
def process_titles(db_path, resolution): | |
"""Process titles from the specified SQLite database, write results to a JSONL file, and upsert into 'shows' table.""" | |
try: | |
# Connect to the SQLite database | |
conn = sqlite3.connect(db_path) | |
conn.create_function("REGEXP", 2, regexp) | |
cursor = conn.cursor() | |
# Create 'shows' table if it doesn't exist | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS shows ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
seriesname TEXT, | |
seasonepisode TEXT, | |
episodename TEXT, | |
resolution TEXT, | |
encoding TEXT, | |
network TEXT, | |
repack BOOLEAN DEFAULT FALSE, | |
releasegroup TEXT, | |
original TEXT, | |
link TEXT, | |
json_data TEXT, | |
UNIQUE(seriesname, seasonepisode, link) | |
) | |
''') | |
# Execute the query | |
cursor.execute(""" | |
SELECT title, link | |
FROM torrentgalaxy | |
WHERE title LIKE ? | |
AND title REGEXP 'S[0-9]{2,3}E[0-9]{2,3}' | |
""", (f'%{resolution}%',)) | |
# Fetch all results | |
results = cursor.fetchall() | |
# Prepare the output file name | |
output_file = os.path.splitext(db_path)[0] + '.jsonl' | |
# Process each title, write to JSONL file, and upsert into 'shows' table | |
with open(output_file, 'w', encoding='utf-8') as f: | |
for row in results: | |
title, link = row | |
parsed = parse_title(title) | |
if parsed: | |
parsed['original'] = title | |
parsed['link'] = link | |
json_data = json.dumps(parsed, ensure_ascii=False) | |
# Write to JSONL file | |
f.write(json_data + '\n') | |
# Upsert into 'shows' table | |
cursor.execute(''' | |
INSERT OR REPLACE INTO shows | |
(seriesname, seasonepisode, episodename, resolution, encoding, network, releasegroup, original, link, json_data, repack) | |
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
''', ( | |
parsed.get('seriesname', ''), | |
parsed.get('seasonepisode', ''), | |
parsed.get('episodename', ''), | |
parsed.get('resolution', ''), | |
parsed.get('encoding', ''), | |
parsed.get('network', ''), | |
parsed.get('releasegroup', ''), | |
parsed['original'], | |
parsed['link'], | |
json_data, | |
parsed.get('repack', False) | |
)) | |
click.echo(f"Processed: {title}") | |
else: | |
click.echo(f"Failed to parse: {title}") | |
# Commit changes and close the database connection | |
conn.commit() | |
conn.close() | |
click.echo(f"\nResults written to {output_file} and upserted into 'shows' table") | |
except sqlite3.Error as e: | |
click.echo(f"A database error occurred: {e}", err=True) | |
except IOError as e: | |
click.echo(f"An I/O error occurred: {e}", err=True) | |
except Exception as e: | |
click.echo(f"An unexpected error occurred: {e}", err=True) | |
if __name__ == '__main__': | |
process_titles() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# /etc/systemd/system/hourly-job.service | |
[Unit] | |
Description=TGX Hourly Job | |
[Service] | |
Type=oneshot | |
ExecStart=/home/linuxbrew/.linuxbrew/bin/feed-to-sqlite /home/tgx/tgx.sqlite3 'https://torrentgalaxy.to/rss.php?cat=41' 'https://torrentgalaxy.to/rss.php?cat=42' | |
[Install] | |
WantedBy=multi-user.target |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# /etc/systemd/system/hourly-job.timer | |
[Unit] | |
Description=Run hourly job | |
[Timer] | |
OnCalendar=hourly | |
Persistent=true | |
[Install] | |
WantedBy=timers.target |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
So.. I found a brew utility that turns rss into sqlite and it worked well.
Honestly, I absolutely hate the *arr apps (sonarr, radarr), and I really want a different solution. TGX doesn't want you scraping the site, but the rss feed is fair game, so I wrote this monstrosity to bust up the entries.
So this entry
turns into:
So, you can easily search jsonl file or use the sqlite with a web interface like Datasette
and you can do neato things like quickly facet columns.
or, see all the megusta and elite releases.
This isn't a search engine! You'll only know about stuff that you've cached from the RSS feed, so if you're looking for older entries you'll have to go to the website.
I have another process that sends current shows to my debrid service. I can make sure I get the exact resolution, encoding, and even release group.
Use at your own risk!