Created
September 20, 2022 09:24
-
-
Save blacklight/a142145af4c6c683c5f3a4965778515c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ~/.config/platypush/scripts/music/releases.py | |
import html | |
import logging | |
import re | |
import threading | |
from datetime import date, timedelta | |
from typing import Iterable, List | |
from platypush.context import get_plugin | |
from platypush.cron import cron | |
from platypush.event.hook import hook | |
from platypush.message.event.rss import NewFeedEntryEvent | |
from scripts.music.db import ( | |
music_db_engine, get_db_session, NewRelease | |
) | |
create_lock = threading.RLock() | |
logger = logging.getLogger(__name__) | |
def _split_html_lines(content: str) -> List[str]: | |
""" | |
Utility method used to convert and split the HTML lines reported | |
by the RSS feed. | |
""" | |
return [ | |
l.strip() | |
for l in re.sub( | |
r'(</?p[^>]*>)|(<br\s*/?>)', | |
'\n', | |
content | |
).split('\n') if l | |
] | |
def _get_summary_field(title: str, lines: Iterable[str]) -> str | None: | |
""" | |
Parse the fields of a new album from the feed HTML summary. | |
""" | |
for line in lines: | |
m = re.match(rf'^{title}:\s+(.*)$', line.strip(), re.IGNORECASE) | |
if m: | |
return html.unescape(m.group(1)) | |
@hook(NewFeedEntryEvent, feed_url='https://newalbumreleases.net/category/cat/feed/') | |
def save_new_release(event: NewFeedEntryEvent, **_): | |
""" | |
This hook is triggered whenever the newalbumreleases.net has new entries. | |
""" | |
# Parse artist and album | |
summary = _split_html_lines(event.summary) | |
artist = _get_summary_field('artist', summary) | |
album = _get_summary_field('album', summary) | |
genre = _get_summary_field('style', summary) | |
if not (artist and album): | |
return | |
# Check if we have listened to this artist at least once | |
db = get_plugin('db') | |
num_plays = int( | |
db.select( | |
engine=music_db_engine, | |
query= | |
''' | |
select count(*) | |
from music_activity a | |
join music_track t | |
on a.track_id = t.id | |
where artist = :artist | |
''', | |
data={'artist': artist}, | |
).output[0].get('count', 0) | |
) | |
# If not, skip it | |
if not num_plays: | |
return | |
# Insert the new release on the database | |
with create_lock: | |
db.insert( | |
engine=music_db_engine, | |
table='new_release', | |
records=[{ | |
'artist': artist, | |
'album': album, | |
'genre': genre, | |
}], | |
key_columns=('artist', 'album'), | |
on_duplicate_update=True, | |
) | |
def get_new_releases(days=7): | |
""" | |
Retrieve the new album releases from the database. | |
:param days: Look at albums releases in the past <n> days | |
(default: 7) | |
""" | |
with get_db_session() as session: | |
return [ | |
{ | |
'artist': t[0], | |
'album': t[1], | |
} | |
for t in session.query( | |
NewRelease.artist, | |
NewRelease.album, | |
) | |
.select_from( | |
NewRelease.__table__ | |
) | |
.filter( | |
# Filter by recent activity | |
NewRelease.created_at >= date.today() - timedelta(days=days) | |
) | |
.all() | |
] | |
def search_tidal_new_releases(albums): | |
""" | |
Search for Tidal albums given a list of objects with artist and title. | |
""" | |
tidal = get_plugin('music.tidal') | |
expanded_tracks = [] | |
for album in albums: | |
query = album['artist'] + ' ' + album['album'] | |
logger.info('Searching "%s"', query) | |
results = ( | |
tidal.search(query, type='album', limit=1) | |
.output.get('albums', []) | |
) | |
if results: | |
album = results[0] | |
# Skip search results older than a year - some new releases may | |
# actually be remasters/re-releases of existing albums | |
if date.today().year - album.get('year', 0) > 1: | |
continue | |
expanded_tracks += ( | |
tidal.get_album(results[0]['id']). | |
output.get('tracks', []) | |
) | |
else: | |
logger.warning('Could not find "%s" on TIDAL', query) | |
return expanded_tracks | |
def refresh_release_radar(): | |
tidal = get_plugin('music.tidal') | |
# Get the latest releases | |
tracks = search_tidal_new_releases(get_new_releases()) | |
if not tracks: | |
logger.info('No new releases found') | |
return | |
# Retrieve the existing new releases playlists | |
playlists = tidal.get_playlists().output | |
new_releases_playlists = sorted( | |
[ | |
pl for pl in playlists | |
if pl['name'].lower().startswith('new releases') | |
], | |
key=lambda pl: pl.get('created_at', 0) | |
) | |
# Delete all the existing new releases playlists | |
# (except the latest one) | |
for playlist in new_releases_playlists[:-1]: | |
logger.info('Deleting playlist "%s"', playlist['name']) | |
tidal.delete_playlist(playlist['id']) | |
# Create a new releases playlist | |
playlist_name = f'New Releases [{date.today().isoformat()}]' | |
pl = tidal.create_playlist(playlist_name).output | |
playlist_id = pl['id'] | |
tidal.add_to_playlist( | |
playlist_id, | |
[t['id'] for t in tracks], | |
) | |
@cron('0 7 * * 1') | |
def refresh_release_radar_cron(**_): | |
""" | |
This cron will execute every Monday at 7 AM. | |
""" | |
try: | |
refresh_release_radar() | |
except Exception as e: | |
logger.exception(e) | |
get_plugin('ntfy').send_message( | |
topic='mirrored-notifications-topic', | |
title='Release Radar playlist generation failed', | |
message=str(e), | |
priority=4, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment