1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
import click
from pathlib import Path
import tomllib
import requests
from urllib.parse import urljoin
from operator import itemgetter
import re
from frozendict import frozendict
class BearerAuth(requests.auth.AuthBase):
def __init__(self, token):
self.token = token
def __call__(self, r):
r.headers["authorization"] = "Bearer " + self.token
return r
class ABSSession(requests.Session):
def __init__(self, config):
super().__init__()
self.base_url = config['instance']
self.auth = BearerAuth(config['api_token'])
def request(self, method, url, *args, **kwargs):
joined_url = urljoin(self.base_url, url)
return super().request(method, joined_url, *args, **kwargs)
@click.command()
@click.argument('config_file', type=click.Path(dir_okay=False, path_type=Path))
def main(config_file: Path):
with config_file.open('rb') as fh:
config = tomllib.load(fh)
with ABSSession(config) as s:
libraries = s.get('/api/libraries').json()['libraries']
playlists = s.get('/api/playlists').json()['playlists']
for library_config in config['libraries']:
[library] = filter(lambda l: l['name'] == library_config['name'], libraries)
filtered_playlists = list(filter(lambda p: p['name'] == library_config['playlist'] and p['libraryId'] == library['id'], playlists))
def get_playlist():
playlist = None
if filtered_playlists:
[playlist] = filtered_playlists
if not playlist:
playlist = s.post('/api/playlists', json={
'libraryId': library['id'],
'name': library_config['playlist'],
}).json()
return playlist
podcasts = dict()
items = s.get('/api/libraries/{}/items'.format(library['id'])).json()['results']
for item in items:
item = s.get('/api/items/{}'.format(item['id']), json={'expanded': True}).json()
episodes = list()
for episode in sorted(item['media']['episodes'], key = itemgetter('publishedAt')):
progress = s.get('/api/me/progress/{}/{}'.format(episode['libraryItemId'], episode['id']))
if progress.ok and progress.json()["isFinished"]:
continue
episodes.append(episode)
podcasts[item['media']['metadata']['title']] = list(map(lambda x: frozendict({ 'libraryItemId': x['libraryItemId'], 'episodeId': x['id']}), episodes))
def lookup_podcast(expr):
expr = re.compile(expr, flags=re.I)
matches = filter(lambda t: expr.search(t), podcasts.keys())
match list(matches):
case [x]:
return (x,)
case _:
raise RuntimeError("No unique match for ‘{}’".format(expr))
priorities = [
[
k
for item in (section if type(section) is list else [section])
for k in lookup_podcast(item)
]
for section in library_config['priorities']
]
playlist_items = list()
for section in priorities:
while any(map(lambda item: item in podcasts, section)):
for item in section:
if not item in podcasts:
continue
if not podcasts[item]:
del podcasts[item]
continue
playlist_items.append(podcasts[item].pop(0))
playlist = get_playlist()
current_playlist_items = map(lambda item: frozendict({ k: v for k, v in item.items() if k in {'libraryItemId', 'episodeId'}}), playlist['items'])
if current_playlist_items == playlist_items:
continue
to_remove = set(current_playlist_items) - set(playlist_items)
if to_remove:
s.post('/api/playlists/{}/batch/remove'.format(playlist['id']), json={'items': list(to_remove)}).raise_for_status()
playlist = get_playlist()
to_add = set(playlist_items) - set(current_playlist_items)
if to_add:
s.post('/api/playlists/{}/batch/add'.format(playlist['id']), json={'items': list(to_add)}).raise_for_status()
r = s.patch('/api/playlists/{}'.format(playlist['id']), json={'items': playlist_items}).raise_for_status()
|