Skip to content

Instantly share code, notes, and snippets.

@bdmorin
Last active July 28, 2024 03:57
Show Gist options
  • Save bdmorin/df66bc16a40d8f15479fd711e7142633 to your computer and use it in GitHub Desktop.
Save bdmorin/df66bc16a40d8f15479fd711e7142633 to your computer and use it in GitHub Desktop.
torrentgalaxy rss manager
import re
import sqlite3
import click
import json
import os
def remove_periods(text):
return re.sub(r'\.+', ' ', text).strip()
def lowercase_sanitize(value):
return value.lower() if value else None
def sanitize_fields(parsed_data, fields_to_sanitize):
for field in fields_to_sanitize:
if field in parsed_data:
parsed_data[field] = lowercase_sanitize(parsed_data[field])
return parsed_data
def parse_title(title):
result = {}
# Check for REPACK
repack = False
if 'REPACK' in title.upper():
repack = True
title = re.sub(r'REPACK', '', title, flags=re.IGNORECASE)
# Remove file extensions if present
title = re.sub(r'\.(mkv|mp4)$', '', title)
# Match seriesname and seasonepisode
match = re.match(r'(.*?)(S\d{2,3}E\d{2,3})', title, re.IGNORECASE)
if not match:
return None # Unable to parse
result['seriesname'] = remove_periods(match.group(1).strip())
result['seasonepisode'] = match.group(2).lower()
# Match episodename
episodename_match = re.search(r'S\d{2,3}E\d{2,3}(.*?)([\s|.])(?=\d{3,4}p)', title, re.IGNORECASE)
if episodename_match:
episodename = episodename_match.group(1).strip()
if re.search(r'[a-zA-Z]', episodename): # Check if there are any alphabetic characters
result['episodename'] = remove_periods(episodename)
# Match resolution
resolution_match = re.search(r'[.|\s]+(\d{3,4}p)[.|\s]+', title, re.IGNORECASE)
if resolution_match:
result['resolution'] = resolution_match.group(1).lower()
else:
return None # Resolution is required
# Match encoding
encoding_match = re.search(r'((?:x|h)[\s.]?26[45])', title, re.IGNORECASE)
if encoding_match:
result['encoding'] = encoding_match.group(1).replace(' ', '').replace('.', '')
else:
return None # Encoding is required
# Match network (optional)
network_match = re.search(r'[\s|.]{1}(buffer|max|amzn|hulu|dsnp|atvp|nf|ITVX|buffer)[\s|.]{1}', title, re.IGNORECASE)
if network_match:
result['network'] = network_match.group(1).upper()
# Match releasegroup (optional)
releasegroup_match = re.search(r'(?<=\-)([^-]*)$', title)
if releasegroup_match:
result['releasegroup'] = remove_periods(releasegroup_match.group(1))
# Apply sanitization to specific fields
fields_to_sanitize = ['encoding', 'resolution', 'network']
result = sanitize_fields(result, fields_to_sanitize)
result['repack'] = repack
return result
def regexp(expr, item):
reg = re.compile(expr)
return reg.search(item) is not None
@click.command()
@click.option('--db-path', default='tgx.sqlite3', help='Path to the SQLite database')
@click.option('--resolution', default='1080p', help='Resolution to filter (e.g., 1080p, 720p)')
def process_titles(db_path, resolution):
"""Process titles from the specified SQLite database, write results to a JSONL file, and upsert into 'shows' table."""
try:
# Connect to the SQLite database
conn = sqlite3.connect(db_path)
conn.create_function("REGEXP", 2, regexp)
cursor = conn.cursor()
# Create 'shows' table if it doesn't exist
cursor.execute('''
CREATE TABLE IF NOT EXISTS shows (
id INTEGER PRIMARY KEY AUTOINCREMENT,
seriesname TEXT,
seasonepisode TEXT,
episodename TEXT,
resolution TEXT,
encoding TEXT,
network TEXT,
repack BOOLEAN DEFAULT FALSE,
releasegroup TEXT,
original TEXT,
link TEXT,
json_data TEXT,
UNIQUE(seriesname, seasonepisode, link)
)
''')
# Execute the query
cursor.execute("""
SELECT title, link
FROM torrentgalaxy
WHERE title LIKE ?
AND title REGEXP 'S[0-9]{2,3}E[0-9]{2,3}'
""", (f'%{resolution}%',))
# Fetch all results
results = cursor.fetchall()
# Prepare the output file name
output_file = os.path.splitext(db_path)[0] + '.jsonl'
# Process each title, write to JSONL file, and upsert into 'shows' table
with open(output_file, 'w', encoding='utf-8') as f:
for row in results:
title, link = row
parsed = parse_title(title)
if parsed:
parsed['original'] = title
parsed['link'] = link
json_data = json.dumps(parsed, ensure_ascii=False)
# Write to JSONL file
f.write(json_data + '\n')
# Upsert into 'shows' table
cursor.execute('''
INSERT OR REPLACE INTO shows
(seriesname, seasonepisode, episodename, resolution, encoding, network, releasegroup, original, link, json_data, repack)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
parsed.get('seriesname', ''),
parsed.get('seasonepisode', ''),
parsed.get('episodename', ''),
parsed.get('resolution', ''),
parsed.get('encoding', ''),
parsed.get('network', ''),
parsed.get('releasegroup', ''),
parsed['original'],
parsed['link'],
json_data,
parsed.get('repack', False)
))
click.echo(f"Processed: {title}")
else:
click.echo(f"Failed to parse: {title}")
# Commit changes and close the database connection
conn.commit()
conn.close()
click.echo(f"\nResults written to {output_file} and upserted into 'shows' table")
except sqlite3.Error as e:
click.echo(f"A database error occurred: {e}", err=True)
except IOError as e:
click.echo(f"An I/O error occurred: {e}", err=True)
except Exception as e:
click.echo(f"An unexpected error occurred: {e}", err=True)
if __name__ == '__main__':
process_titles()
# /etc/systemd/system/hourly-job.service
[Unit]
Description=TGX Hourly Job
[Service]
Type=oneshot
ExecStart=/home/linuxbrew/.linuxbrew/bin/feed-to-sqlite /home/tgx/tgx.sqlite3 'https://torrentgalaxy.to/rss.php?cat=41' 'https://torrentgalaxy.to/rss.php?cat=42'
[Install]
WantedBy=multi-user.target
# /etc/systemd/system/hourly-job.timer
[Unit]
Description=Run hourly job
[Timer]
OnCalendar=hourly
Persistent=true
[Install]
WantedBy=timers.target
@bdmorin
Copy link
Author

bdmorin commented Jul 28, 2024

So.. I found a brew utility that turns rss into sqlite and it worked well.
Honestly, I absolutely hate the *arr apps (sonarr, radarr), and I really want a different solution. TGX doesn't want you scraping the site, but the rss feed is fair game, so I wrote this monstrosity to bust up the entries.

❯ python3 tgx.py --help
Usage: tgx.py [OPTIONS]

  Process titles from the specified SQLite database, write results to a JSONL
  file, and upsert into 'shows' table.

Options:
  --db-path TEXT     Path to the SQLite database
  --resolution TEXT  Resolution to filter (e.g., 1080p, 720p)
  --help             Show this message and exit.

$ python3 tgx.py --db-path tgx.sqlite3

So this entry

What We Do in the Shadows S05E09 A Weekend at Morrigan Manor 1080p AMZN WEB-DL DDP5 1 H 264-FLUX[TGx]

turns into:

{
  "seriesname": "What We Do in the Shadows",
  "seasonepisode": "s05e09",
  "episodename": "A Weekend at Morrigan Manor",
  "resolution": "1080p",
  "encoding": "h264",
  "network": "amzn",
  "releasegroup": "FLUX[TGx]",
  "repack": false,
  "original": "What We Do in the Shadows S05E09 A Weekend at Morrigan Manor 1080p AMZN WEB-DL DDP5 1 H 264-FLUX[TGx]",
  "link": "https://watercache.nanobytes.org/get/51bcf0c6fbddcc0a698c6a137670b56e157d83b1/What+We+Do+in+the+Shadows+S05E09+A+Weekend+at+Morrigan+Manor+1080p+AMZN+WEB-DL+DDP5+1+H+264-FLUX%5BTGx%5D"
}

So, you can easily search jsonl file or use the sqlite with a web interface like Datasette

datasette-studio serve tgx.sqlite3

and you can do neato things like quickly facet columns.

or, see all the megusta and elite releases.

SELECT
*
FROM
  shows
WHERE
  LOWER(releasegroup) LIKE '%megusta%'
  or LOWER(releasegroup) LIKE '%elite%'
ORDER BY
  releasegroup,seriesname,id
LIMIT
  101

This isn't a search engine! You'll only know about stuff that you've cached from the RSS feed, so if you're looking for older entries you'll have to go to the website.

I have another process that sends current shows to my debrid service. I can make sure I get the exact resolution, encoding, and even release group.

Use at your own risk!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment