From 81cc664d4250189c9026edfb042e24c6806448ee Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Sat, 10 May 2025 12:53:58 +0200 Subject: abs-podcast-autoplaylist --- .../abs_podcast_autoplaylist/__main__.py | 107 +++++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 overlays/abs-podcast-autoplaylist/abs_podcast_autoplaylist/__main__.py (limited to 'overlays/abs-podcast-autoplaylist/abs_podcast_autoplaylist/__main__.py') diff --git a/overlays/abs-podcast-autoplaylist/abs_podcast_autoplaylist/__main__.py b/overlays/abs-podcast-autoplaylist/abs_podcast_autoplaylist/__main__.py new file mode 100644 index 00000000..fd739805 --- /dev/null +++ b/overlays/abs-podcast-autoplaylist/abs_podcast_autoplaylist/__main__.py @@ -0,0 +1,107 @@ +import click +from pathlib import Path +import tomllib +import requests +from urllib.parse import urljoin +from operator import itemgetter +import re +from frozendict import frozendict + +class BearerAuth(requests.auth.AuthBase): + def __init__(self, token): + self.token = token + def __call__(self, r): + r.headers["authorization"] = "Bearer " + self.token + return r + +class ABSSession(requests.Session): + def __init__(self, config): + super().__init__() + self.base_url = config['instance'] + self.auth = BearerAuth(config['api_token']) + + def request(self, method, url, *args, **kwargs): + joined_url = urljoin(self.base_url, url) + return super().request(method, joined_url, *args, **kwargs) + +@click.command() +@click.argument('config_file', type=click.Path(dir_okay=False, path_type=Path)) +def main(config_file: Path): + with config_file.open('rb') as fh: + config = tomllib.load(fh) + + with ABSSession(config) as s: + libraries = s.get('/api/libraries').json()['libraries'] + playlists = s.get('/api/playlists').json()['playlists'] + + for library_config in config['libraries']: + [library] = filter(lambda l: l['name'] == library_config['name'], libraries) + filtered_playlists = list(filter(lambda p: p['name'] == library_config['playlist'] and p['libraryId'] == library['id'], playlists)) + def get_playlist(): + playlist = None + if filtered_playlists: + [playlist] = filtered_playlists + if not playlist: + playlist = s.post('/api/playlists', json={ + 'libraryId': library['id'], + 'name': library_config['playlist'], + }).json() + return playlist + + podcasts = dict() + items = s.get('/api/libraries/{}/items'.format(library['id'])).json()['results'] + for item in items: + item = s.get('/api/items/{}'.format(item['id']), json={'expanded': True}).json() + episodes = list() + for episode in sorted(item['media']['episodes'], key = itemgetter('publishedAt')): + progress = s.get('/api/me/progress/{}/{}'.format(episode['libraryItemId'], episode['id'])) + if progress.ok and progress.json()["isFinished"]: + continue + episodes.append(episode) + podcasts[item['media']['metadata']['title']] = list(map(lambda x: frozendict({ 'libraryItemId': x['libraryItemId'], 'episodeId': x['id']}), episodes)) + def lookup_podcast(expr): + expr = re.compile(expr, flags=re.I) + matches = filter(lambda t: expr.search(t), podcasts.keys()) + match list(matches): + case [x]: + return (x,) + case _: + raise RuntimeError("No unique match for ‘{}’".format(expr)) + + priorities = [ + [ + k + for item in (section if type(section) is list else [section]) + for k in lookup_podcast(item) + ] + for section in library_config['priorities'] + ] + + playlist_items = list() + for section in priorities: + while any(map(lambda item: item in podcasts, section)): + for item in section: + if not item in podcasts: + continue + + if not podcasts[item]: + del podcasts[item] + continue + + playlist_items.append(podcasts[item].pop(0)) + + playlist = get_playlist() + current_playlist_items = map(lambda item: frozendict({ k: v for k, v in item.items() if k in {'libraryItemId', 'episodeId'}}), playlist['items']) + + if current_playlist_items == playlist_items: + continue + + to_remove = set(current_playlist_items) - set(playlist_items) + if to_remove: + s.post('/api/playlists/{}/batch/remove'.format(playlist['id']), json={'items': list(to_remove)}).raise_for_status() + playlist = get_playlist() + to_add = set(playlist_items) - set(current_playlist_items) + if to_add: + s.post('/api/playlists/{}/batch/add'.format(playlist['id']), json={'items': list(to_add)}).raise_for_status() + + r = s.patch('/api/playlists/{}'.format(playlist['id']), json={'items': playlist_items}).raise_for_status() -- cgit v1.2.3