Created
September 20, 2022 08:56
-
-
Save blacklight/3d50678fc1f256ee1d59d1016cc51798 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ~/.config/platypush/scripts/music/suggestions.py | |
import logging | |
from sqlalchemy import tuple_ | |
from sqlalchemy.dialects.postgresql import insert | |
from sqlalchemy.sql.expression import bindparam | |
from platypush.context import get_plugin, Variable | |
from platypush.cron import cron | |
from scripts.music.db import ( | |
get_db_session, Track, TrackActivity, TrackSimilar | |
) | |
logger = logging.getLogger('music_suggestions') | |
# This stored variable will keep track of the latest activity ID for which the | |
# suggestions were calculated | |
last_activity_id_var = Variable('LAST_PROCESSED_ACTIVITY_ID') | |
# A cronjob that runs every 5 minutes and updates the suggestions | |
@cron('*/5 * * * *') | |
def refresh_similar_tracks(**_): | |
last_activity_id = int(last_activity_id_var.get() or 0) | |
# Retrieve all the tracks played since the latest synchronized activity ID | |
# that don't have any similar tracks being calculated yet | |
with get_db_session() as session: | |
recent_tracks_without_similars = \ | |
_get_recent_tracks_without_similars(last_activity_id) | |
try: | |
if not recent_tracks_without_similars: | |
raise StopIteration( | |
'All the recent tracks have processed suggestions') | |
# Get the last activity_id | |
batch_size = 10 | |
last_activity_id = ( | |
recent_tracks_without_similars[:batch_size][-1]['activity_id']) | |
logger.info( | |
'Processing suggestions for %d/%d tracks', | |
min(batch_size, len(recent_tracks_without_similars)), | |
len(recent_tracks_without_similars)) | |
# Build the track_id -> [similar_tracks] map | |
similars_by_track = { | |
track['track_id']: _get_similar_tracks(track['artist'], track['title']) | |
for track in recent_tracks_without_similars[:batch_size] | |
} | |
# Map all the similar tracks in an (artist, title) -> info data structure | |
similar_tracks_by_artist_and_title = \ | |
_get_similar_tracks_by_artist_and_title(similars_by_track) | |
if not similar_tracks_by_artist_and_title: | |
raise StopIteration('No new suggestions to process') | |
# Sync all the new similar tracks to the database | |
similar_tracks = \ | |
_sync_missing_similar_tracks(similar_tracks_by_artist_and_title) | |
# Link listened tracks to similar tracks | |
with get_db_session() as session: | |
stmt = insert(TrackSimilar).values({ | |
'source_track_id': bindparam('source_track_id'), | |
'target_track_id': bindparam('target_track_id'), | |
'match_score': bindparam('match_score'), | |
}).on_conflict_do_nothing() | |
session.execute( | |
stmt, [ | |
{ | |
'source_track_id': track_id, | |
'target_track_id': similar_tracks[(similar['artist'], similar['title'])].id, | |
'match_score': similar['score'], | |
} | |
for track_id, similars in similars_by_track.items() | |
for similar in (similars or []) | |
if (similar['artist'], similar['title']) | |
in similar_tracks | |
] | |
) | |
session.flush() | |
session.commit() | |
except StopIteration as e: | |
logger.info(e) | |
last_activity_id_var.set(last_activity_id) | |
logger.info('Suggestions updated') | |
def _get_similar_tracks(artist, title): | |
""" | |
Use the last.fm API to retrieve the tracks similar to a given | |
artist/title pair | |
""" | |
import pylast | |
lastfm = get_plugin('lastfm') | |
try: | |
return lastfm.get_similar_tracks( | |
artist=artist, | |
title=title, | |
limit=10, | |
) | |
except pylast.PyLastError as e: | |
logger.warning( | |
'Could not find tracks similar to %s - %s: %s', | |
artist, title, e | |
) | |
def _get_recent_tracks_without_similars(last_activity_id): | |
""" | |
Get all the tracks played after a certain activity ID that don't have | |
any suggestions yet. | |
""" | |
with get_db_session() as session: | |
return [ | |
{ | |
'track_id': t[0], | |
'artist': t[1], | |
'title': t[2], | |
'activity_id': t[3], | |
} | |
for t in session.query( | |
Track.id.label('track_id'), | |
Track.artist, | |
Track.title, | |
TrackActivity.id.label('activity_id'), | |
) | |
.select_from( | |
Track.__table__ | |
.join( | |
TrackSimilar, | |
Track.id == TrackSimilar.source_track_id, | |
isouter=True | |
) | |
.join( | |
TrackActivity, | |
Track.id == TrackActivity.track_id | |
) | |
) | |
.filter( | |
TrackSimilar.source_track_id.is_(None), | |
TrackActivity.id > last_activity_id | |
) | |
.order_by(TrackActivity.id) | |
.all() | |
] | |
def _get_similar_tracks_by_artist_and_title(similars_by_track): | |
""" | |
Map similar tracks into an (artist, title) -> track dictionary | |
""" | |
similar_tracks_by_artist_and_title = {} | |
for similar in similars_by_track.values(): | |
for track in (similar or []): | |
similar_tracks_by_artist_and_title[ | |
(track['artist'], track['title']) | |
] = track | |
return similar_tracks_by_artist_and_title | |
def _sync_missing_similar_tracks(similar_tracks_by_artist_and_title): | |
""" | |
Flush newly calculated similar tracks to the database. | |
""" | |
logger.info('Syncing missing similar tracks') | |
with get_db_session() as session: | |
stmt = insert(Track).values({ | |
'artist': bindparam('artist'), | |
'title': bindparam('title'), | |
}).on_conflict_do_nothing() | |
session.execute(stmt, list(similar_tracks_by_artist_and_title.values())) | |
session.flush() | |
session.commit() | |
tracks = session.query(Track).filter( | |
tuple_(Track.artist, Track.title).in_( | |
similar_tracks_by_artist_and_title | |
) | |
).all() | |
return { | |
(track.artist, track.title): track | |
for track in tracks | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment