import click from pathlib import Path import tomllib import requests from urllib.parse import urljoin from operator import itemgetter import re from frozendict import frozendict class BearerAuth(requests.auth.AuthBase): def __init__(self, token): self.token = token def __call__(self, r): r.headers["authorization"] = "Bearer " + self.token return r class ABSSession(requests.Session): def __init__(self, config): super().__init__() self.base_url = config['instance'] self.auth = BearerAuth(config['api_token']) def request(self, method, url, *args, **kwargs): joined_url = urljoin(self.base_url, url) return super().request(method, joined_url, *args, **kwargs) @click.command() @click.argument('config_file', type=click.Path(dir_okay=False, path_type=Path)) def main(config_file: Path): with config_file.open('rb') as fh: config = tomllib.load(fh) with ABSSession(config) as s: libraries = s.get('/api/libraries').json()['libraries'] playlists = s.get('/api/playlists').json()['playlists'] for library_config in config['libraries']: [library] = filter(lambda l: l['name'] == library_config['name'], libraries) filtered_playlists = list(filter(lambda p: p['name'] == library_config['playlist'] and p['libraryId'] == library['id'], playlists)) def get_playlist(): playlist = None if filtered_playlists: [playlist] = filtered_playlists if not playlist: playlist = s.post('/api/playlists', json={ 'libraryId': library['id'], 'name': library_config['playlist'], }).json() return playlist podcasts = dict() items = s.get('/api/libraries/{}/items'.format(library['id'])).json()['results'] for item in items: item = s.get('/api/items/{}'.format(item['id']), json={'expanded': True}).json() episodes = list() for episode in sorted(item['media']['episodes'], key = itemgetter('publishedAt')): progress = s.get('/api/me/progress/{}/{}'.format(episode['libraryItemId'], episode['id'])) if progress.ok and progress.json()["isFinished"]: continue episodes.append(episode) podcasts[item['media']['metadata']['title']] = list(map(lambda x: frozendict({ 'libraryItemId': x['libraryItemId'], 'episodeId': x['id']}), episodes)) def lookup_podcast(expr): expr = re.compile(expr, flags=re.I) matches = filter(lambda t: expr.search(t), podcasts.keys()) match list(matches): case [x]: return (x,) case _: raise RuntimeError("No unique match for ‘{}’".format(expr)) priorities = [ [ k for item in (section if type(section) is list else [section]) for k in lookup_podcast(item) ] for section in library_config['priorities'] ] playlist_items = list() for section in priorities: while any(map(lambda item: item in podcasts, section)): for item in section: if not item in podcasts: continue if not podcasts[item]: del podcasts[item] continue playlist_items.append(podcasts[item].pop(0)) playlist = get_playlist() current_playlist_items = map(lambda item: frozendict({ k: v for k, v in item.items() if k in {'libraryItemId', 'episodeId'}}), playlist['items']) if current_playlist_items == playlist_items: continue to_remove = set(current_playlist_items) - set(playlist_items) if to_remove: s.post('/api/playlists/{}/batch/remove'.format(playlist['id']), json={'items': list(to_remove)}).raise_for_status() playlist = get_playlist() to_add = set(playlist_items) - set(current_playlist_items) if to_add: s.post('/api/playlists/{}/batch/add'.format(playlist['id']), json={'items': list(to_add)}).raise_for_status() r = s.patch('/api/playlists/{}'.format(playlist['id']), json={'items': playlist_items}).raise_for_status()