Created
September 20, 2022 08:57
-
-
Save blacklight/a86e853a182bd30797846a5fa3ff58d7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ~/.config/platypush/scripts/music/discovery.py | |
import logging | |
from datetime import date, timedelta | |
from platypush.context import get_plugin | |
from platypush.cron import cron | |
from scripts.music.db import ( | |
get_db_session, Track, TrackActivity, TrackSimilar, | |
DiscoveryPlaylist, DiscoveryPlaylistTrack | |
) | |
logger = logging.getLogger('music_discovery') | |
def get_suggested_tracks(days=7, limit=25): | |
""" | |
Retrieve the suggested tracks from the database. | |
:param days: Look back at the listen history for the past <n> days | |
(default: 7). | |
:param limit: Maximum number of track in the discovery playlist | |
(default: 25). | |
""" | |
from sqlalchemy import func | |
listened_activity = TrackActivity.__table__.alias('listened_activity') | |
suggested_activity = TrackActivity.__table__.alias('suggested_activity') | |
with get_db_session() as session: | |
return [ | |
{ | |
'track_id': t[0], | |
'artist': t[1], | |
'title': t[2], | |
'score': t[3], | |
} | |
for t in session.query( | |
Track.id, | |
func.min(Track.artist), | |
func.min(Track.title), | |
func.sum(TrackSimilar.match_score).label('score'), | |
) | |
.select_from( | |
Track.__table__ | |
.join( | |
TrackSimilar.__table__, | |
Track.id == TrackSimilar.target_track_id | |
) | |
.join( | |
listened_activity, | |
listened_activity.c.track_id == TrackSimilar.source_track_id, | |
) | |
.join( | |
suggested_activity, | |
suggested_activity.c.track_id == TrackSimilar.target_track_id, | |
isouter=True | |
) | |
.join( | |
DiscoveryPlaylistTrack, | |
Track.id == DiscoveryPlaylistTrack.track_id, | |
isouter=True | |
) | |
) | |
.filter( | |
# The track has not been listened | |
suggested_activity.c.track_id.is_(None), | |
# The track has not been suggested already | |
DiscoveryPlaylistTrack.track_id.is_(None), | |
# Filter by recent activity | |
listened_activity.c.created_at >= date.today() - timedelta(days=days) | |
) | |
.group_by(Track.id) | |
# Sort by aggregate match score | |
.order_by(func.sum(TrackSimilar.match_score).desc()) | |
.limit(limit) | |
.all() | |
] | |
def search_remote_tracks(tracks): | |
""" | |
Search for Tidal tracks given a list of suggested tracks. | |
""" | |
# If you use Spotify instead of Tidal, simply replacing `music.tidal` | |
# with `music.spotify` here should suffice. | |
tidal = get_plugin('music.tidal') | |
found_tracks = [] | |
for track in tracks: | |
query = track['artist'] + ' ' + track['title'] | |
logger.info('Searching "%s"', query) | |
results = ( | |
tidal.search(query, type='track', limit=1).output.get('tracks', []) | |
) | |
if results: | |
track['remote_track_id'] = results[0]['id'] | |
found_tracks.append(track) | |
else: | |
logger.warning('Could not find "%s" on TIDAL', query) | |
return found_tracks | |
def refresh_discover_weekly(): | |
# If you use Spotify instead of Tidal, simply replacing `music.tidal` | |
# with `music.spotify` here should suffice. | |
tidal = get_plugin('music.tidal') | |
# Get the latest suggested tracks | |
suggestions = search_remote_tracks(get_suggested_tracks()) | |
if not suggestions: | |
logger.info('No suggestions available') | |
return | |
# Retrieve the existing discovery playlists | |
# Our naming convention is that discovery playlist names start with | |
# "Discover Weekly" - feel free to change it | |
playlists = tidal.get_playlists().output | |
discover_playlists = sorted( | |
[ | |
pl for pl in playlists | |
if pl['name'].lower().startswith('discover weekly') | |
], | |
key=lambda pl: pl.get('created_at', 0) | |
) | |
# Delete all the existing discovery playlists | |
# (except the latest one). We basically keep two discovery playlists at the | |
# time in our collection, so you have two weeks to listen to them before they | |
# get deleted. Feel free to change this logic by modifying the -1 parameter | |
# with e.g. -2, -3 etc. if you want to store more discovery playlists. | |
for playlist in discover_playlists[:-1]: | |
logger.info('Deleting playlist "%s"', playlist['name']) | |
tidal.delete_playlist(playlist['id']) | |
# Create a new discovery playlist | |
playlist_name = f'Discover Weekly [{date.today().isoformat()}]' | |
pl = tidal.create_playlist(playlist_name).output | |
playlist_id = pl['id'] | |
tidal.add_to_playlist( | |
playlist_id, | |
[t['remote_track_id'] for t in suggestions], | |
) | |
# Add the playlist to the database | |
with get_db_session() as session: | |
pl = DiscoveryPlaylist(name=playlist_name) | |
session.add(pl) | |
session.flush() | |
session.commit() | |
# Add the playlist entries to the database | |
with get_db_session() as session: | |
for track in suggestions: | |
session.add( | |
DiscoveryPlaylistTrack( | |
playlist_id=pl.id, | |
track_id=track['track_id'], | |
) | |
) | |
session.commit() | |
logger.info('Discover Weekly playlist updated') | |
@cron('0 6 * * 1') | |
def refresh_discover_weekly_cron(**_): | |
""" | |
This cronjob runs every Monday at 6 AM. | |
""" | |
try: | |
refresh_discover_weekly() | |
except Exception as e: | |
logger.exception(e) | |
# (Optional) If anything went wrong with the playlist generation, send | |
# a notification over ntfy | |
ntfy = get_plugin('ntfy') | |
ntfy.send_message( | |
topic='mirrored-notifications-topic', | |
title='Discover Weekly playlist generation failed', | |
message=str(e), | |
priority=4, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment